亿计数据唯一性多字段去重(Java具体 *** 作)

亿计数据唯一性多字段去重(Java具体 *** 作),第1张

亿计数据库唯一性多字段去重
  • 1.针对问题
  • 2.可能场景
  • 3.达到效果
  • 4.解决方案
    • 4.1 config/druid.properties
    • 4.2 多线程查表拆分
    • 4.3 多线程读文件处理去重
  • 5.处理速度




1.针对问题

针对以下四个问题进行处理

  1. 海量数据
  2. 单独表
  3. 多唯一性字段
  4. 重复数据和垃圾数据多
2.可能场景
  1. 老表没有设置多余的唯一性字段
  2. 进行表数据迁移
3.达到效果
  1. 高效
  2. 准确
  3. 无线程安全问题
4.解决方案

4.1 config/druid.properties
# mysql driverClassName:com.mysql.cj.jdbc.Driver
driverClassName:oracle.jdbc.OracleDriver
# mysql url:jdbc:mysql:thin:@@host:port:数据库名字
url:jdbc:oracle:thin:@host:port:数据库名字
password:
# dbSource:
username:
maxActive:5
initialSize:5
maxWait:600000
minIdle:1
#maxIdle:15

timeBetweenEvictionRunsMillis:60000
minEvictableIdleTimeMillis:30000

validationQuery:SELECT 1 from dual
testWhileIdle:true
testOnBorrow:false
testOnReturn:false
#poolPreparedStatements:true
maxOpenPreparedStatements:100

4.2 多线程查表拆分
package service.jmw;

/**
 *
 * 
 * com.alibaba
 * druid
 * 1.1.21
 * 
 *
 * 
 * commons-io
 * commons-io
 * 2.6
 * 
 *
 * 
 * slf4j-api
 * org.slf4j
 * 
 */

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidDataSourceFactory;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 预备工作:
 * 1.创建数据库连接
 * 2.自定义线程池
 * 

* 正式去重步骤: * 1.启动多个线程去读取数据库,通过不同时间段索引进行分割 * 2.分割后通过MD5的hashcode去模分到100个文件中 * 3.多线程去读取文件,进行去重逻辑 * * @author JMW */ public class HashSplitTool implements Runnable { /** * 拆分入口 */ public static void main(String[] args) { JSONObject jsonObject = new JSONObject(); jsonObject.put("Sql", "查询sql"); jsonObject.put("FileInput", "文件输出位置"); jsonObject.put("FileOutput", ""); try { HashSplitTool.init(); } catch (Exception e) { e.printStackTrace(); } ThreadPoolExecutor threadPoolExecutor = HashSplitTool.poolExecutor; try { threadPoolExecutor.execute(new HashSplitTool(jsonObject)); threadPoolExecutor.execute(new HashSplitTool(jsonObject)); threadPoolExecutor.execute(new HashSplitTool(jsonObject)); // threadPoolExecutor.execute(new HashSplitTool(jsonObject)); // threadPoolExecutor.execute(new HashSplitTool(jsonObject)); // threadPoolExecutor.execute(new HashSplitTool(jsonObject)); // threadPoolExecutor.execute(new HashSplitTool(jsonObject)); // threadPoolExecutor.execute(new HashSplitTool(jsonObject)); // threadPoolExecutor.execute(new HashSplitTool(jsonObject)); // threadPoolExecutor.execute(new HashSplitTool(jsonObject)); } catch (Exception e) { e.printStackTrace(); } finally { threadPoolExecutor.shutdown(); } } private static Logger LOGGER = LoggerFactory.getLogger(HashSplitTool.class); private static DruidDataSource druidDataSource; /** * jdbc druid 初始化 */ public static void init() throws Exception { try { Properties properties = loadPropertiesFile("config/druid.properties"); LOGGER.info("【DBCon.properties】读取文件成功"); druidDataSource = (DruidDataSource) DruidDataSourceFactory.createDataSource(properties); String password = properties.getProperty("password"); } catch (Exception e) { LOGGER.info("【DBCon.properties】获取配置失败"); LOGGER.info(e.getMessage()); } } /** * 加载配置文件方法 */ private static Properties loadPropertiesFile(String fullFile) { if ((fullFile == null) || (fullFile.equals(""))) { throw new IllegalArgumentException("Properties file path can not be null:" + fullFile); } InputStream inputStream = null; Properties p = null; try { inputStream = new FileInputStream(new File(fullFile)); p = new Properties(); p.load(inputStream); } catch (Exception e) { e.printStackTrace(); try { if (inputStream != null) inputStream.close(); } catch (Exception e1) { e1.printStackTrace(); } } finally { try { if (inputStream != null) inputStream.close(); } catch (Exception e) { e.printStackTrace(); } } return p; } /** * maximumPoolSize – the maximum number of threads to allow in the pool * keepAliveTime – when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating. * unit – the time unit for the keepAliveTime argument * workQueue – the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method. * threadFactory – the factory to use when the executor creates a new thread * handler – the handler to use when execution is blocked because the thread bounds and queue capacities are reached * * public ThreadPoolExecutor(int corePoolSize, * int maximumPoolSize, * long keepAliveTime, * TimeUnit unit, * BlockingQueue workQueue, * ThreadFactory threadFactory, * RejectedExecutionHandler handler) */ /** * 自定义线程池 */ public static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor( 2, 10, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); private static JSONObject jsonObject = null; public HashSplitTool(JSONObject jsonObject) throws Exception { this.jsonObject = jsonObject; init(); } @Override public void run() { long start = 0; Map<Integer, Set<String>> map = new HashMap<>(); ResultSet resultSet = null; int count = 0; // DruidPooledConnection connection = null; // PreparedStatement preparedStatement = null; if (jsonObject.size() != 0) { String sql = jsonObject.getString("Sql"); String fileInput = jsonObject.getString("FileInput"); String fileOutput = jsonObject.getString("FileOutput"); /** * prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) * resultSetType 结果集类型 * 1.ResultSet.TYPE_FORWORD_ONLY,结果集只可向前滚动; * 2.ResultSet.TYPE_SCROLL_INSENSITIVE,双向滚动,但不及时更新,就是如果数据库里的数据修改过,并不在ResultSet中反应出来。 * 3.ResultSet.TYPE_SCROLL_SENSITIVE,双向滚动,并及时跟踪数据库的更新,以便更改ResultSet中的数据。 * resultSetConcurrency 结果集可进行的 *** 作 * 1.ResultSet.CONCUR_READ_ONLY 只读 * 2.ResultSet.CONCUR_UPDATABLE 可修改 * resultSetHoldability 事务处理参数 * 1.ResultSet.HOLD_CURSORS_OVER_COMMIT: ResultSet的数据仍然可以被存取在commit 或者 rollback之后. * 2.ResultSet.CLOSE_CURSORS_AT_COMMIT: ResultSet的数据被抛弃在 commits 或者 rollbacks执行后. */ try (DruidPooledConnection connection = druidDataSource.getConnection(2000); PreparedStatement preparedStatement = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.CLOSE_CURSORS_AT_COMMIT);) { connection.setAutoCommit(false); resultSet = preparedStatement.executeQuery(); start = System.currentTimeMillis(); while (resultSet.next()) { //TODO 去重字段有几个就写几个 String ob_object_id = resultSet.getString("OB_OBJECT_ID"); String A = resultSet.getString("字段A"); String B = resultSet.getString("字段B"); String C = resultSet.getString("字段C"); MDFive instance = MDFive.getInstance(); String md5 = instance.getMD5(A + B + C); int absHash = Math.abs(md5.hashCode()); int partition = absHash % 64; if (map.containsKey(partition)) { Set<String> ids = map.get(partition); ids.add(ob_object_id + "#" + md5); if (ids.size() == 1000) { count = count + 1000; try { writeFile(fileInput, partition + "桶.txt", ids); } catch (IOException e) { e.printStackTrace(); } ids.clear(); } } else { Set<String> ids = new HashSet<>(); ids.add(ob_object_id + "#" + md5); map.put(partition, ids); } } connection.commit(); close(connection, preparedStatement); System.out.println("内存数据清空"); for (Map.Entry entrys : map.entrySet()) { Integer partition = (Integer) entrys.getKey(); Set<String> ids = (Set<String>) entrys.getValue(); count = count + ids.size(); try { writeFile(fileInput, partition + "桶.txt", ids); } catch (IOException e) { e.printStackTrace(); } ids.clear(); } } catch (SQLException e) { e.printStackTrace(); } } System.out.println("数量为:"+count); long end = System.currentTimeMillis(); System.out.println((end - start) / 1000.0 + "s"); } private static void writeFile(String path, String fileName, Set<String> ids) throws IOException { File file1 = new File(path); if (!file1.exists()) { file1.mkdirs(); } File file = new File(path + File.separator + fileName); BufferedWriter f = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, true), "UTF-8")); try { for (String id : ids) { f.write(id); f.write("\n"); } } catch (IOException e) { e.printStackTrace(); } finally { f.flush(); f.close(); } } /** * 关闭并且收回资源 * * @param connection * @param preparedStatement */ public static void close(DruidPooledConnection connection, PreparedStatement preparedStatement) { try { connection.recycle(); preparedStatement.close(); } catch (SQLException e) { e.printStackTrace(); } } /** * 静态内部类 用于MD5的使用 */ static class MDFive { /* *四个链接变量 */ private final int A = 0x67452301; private final int B = 0xefcdab89; private final int C = 0x98badcfe; private final int D = 0x10325476; /* *ABCD的临时变量 */ private int Atemp, Btemp, Ctemp, Dtemp; /* *常量ti *公式:floor(abs(sin(i+1))×(2pow32) */ private final int K[] = { 0xd76aa478, 0xe8c7b756, 0x242070db, 0xc1bdceee, 0xf57c0faf, 0x4787c62a, 0xa8304613, 0xfd469501, 0x698098d8, 0x8b44f7af, 0xffff5bb1, 0x895cd7be, 0x6b901122, 0xfd987193, 0xa679438e, 0x49b40821, 0xf61e2562, 0xc040b340, 0x265e5a51, 0xe9b6c7aa, 0xd62f105d, 0x02441453, 0xd8a1e681, 0xe7d3fbc8, 0x21e1cde6, 0xc33707d6, 0xf4d50d87, 0x455a14ed, 0xa9e3e905, 0xfcefa3f8, 0x676f02d9, 0x8d2a4c8a, 0xfffa3942, 0x8771f681, 0x6d9d6122, 0xfde5380c, 0xa4beea44, 0x4bdecfa9, 0xf6bb4b60, 0xbebfbc70, 0x289b7ec6, 0xeaa127fa, 0xd4ef3085, 0x04881d05, 0xd9d4d039, 0xe6db99e5, 0x1fa27cf8, 0xc4ac5665, 0xf4292244, 0x432aff97, 0xab9423a7, 0xfc93a039, 0x655b59c3, 0x8f0ccc92, 0xffeff47d, 0x85845dd1, 0x6fa87e4f, 0xfe2ce6e0, 0xa3014314, 0x4e0811a1, 0xf7537e82, 0xbd3af235, 0x2ad7d2bb, 0xeb86d391}; /* *向左位移数,计算方法未知 */ private final int s[] = {7, 12, 17, 22, 7, 12, 17, 22, 7, 12, 17, 22, 7, 12, 17, 22, 5, 9, 14, 20, 5, 9, 14, 20, 5, 9, 14, 20, 5, 9, 14, 20, 4, 11, 16, 23, 4, 11, 16, 23, 4, 11, 16, 23, 4, 11, 16, 23, 6, 10, 15, 21, 6, 10, 15, 21, 6, 10, 15, 21, 6, 10, 15, 21}; /* *初始化函数 */ private void init() { Atemp = A; Btemp = B; Ctemp = C; Dtemp = D; } /* *移动一定位数 */ private int shift(int a, int s) { return (a << s) | (a >>> (32 - s));//右移的时候,高位一定要补零,而不是补充符号位 } /* *主循环 */ private void MainLoop(int M[]) { int F, g; int a = Atemp; int b = Btemp; int c = Ctemp; int d = Dtemp; for (int i = 0; i < 64; i++) { if (i < 16) { F = (b & c) | ((~b) & d); g = i; } else if (i < 32) { F = (d & b) | ((~d) & c); g = (5 * i + 1) % 16; } else if (i < 48) { F = b ^ c ^ d; g = (3 * i + 5) % 16; } else { F = c ^ (b | (~d)); g = (7 * i) % 16; } int tmp = d; d = c; c = b; b = b + shift(a + F + K[i] + M[g], s[i]); a = tmp; } Atemp = a + Atemp; Btemp = b + Btemp; Ctemp = c + Ctemp; Dtemp = d + Dtemp; } /* *填充函数 *处理后应满足bits≡448(mod512),字节就是bytes≡56(mode64) *填充方式为先加一个0,其它位补零 *最后加上64位的原来长度 */ private int[] add(String str) { int num = ((str.length() + 8) / 64) + 1;//以512位,64个字节为一组 int strByte[] = new int[num * 16];//64/4=16,所以有16个整数 for (int i = 0; i < num * 16; i++) {//全部初始化0 strByte[i] = 0; } int i; for (i = 0; i < str.length(); i++) { strByte[i >> 2] |= str.charAt(i) << ((i % 4) * 8);//一个整数存储四个字节,小端序 } strByte[i >> 2] |= 0x80 << ((i % 4) * 8);//尾部添加1 /* *添加原长度,长度指位的长度,所以要乘8,然后是小端序,所以放在倒数第二个,这里长度只用了32位 */ strByte[num * 16 - 2] = str.length() * 8; return strByte; } /* *调用函数 */ public String getMD5(String source) { init(); int strByte[] = add(source); for (int i = 0; i < strByte.length / 16; i++) { int num[] = new int[16]; for (int j = 0; j < 16; j++) { num[j] = strByte[i * 16 + j]; } MainLoop(num); } return changeHex(Atemp) + changeHex(Btemp) + changeHex(Ctemp) + changeHex(Dtemp); } /* *整数变成16进制字符串 */ private String changeHex(int a) { String str = ""; for (int i = 0; i < 4; i++) { str += String.format("%2s", Integer.toHexString(((a >> i * 8) % (1 << 8)) & 0xff)).replace(' ', '0'); } return str; } /* *单例 */ private static MDFive instance; public static MDFive getInstance() { if (instance == null) { instance = new MDFive(); } return instance; } private MDFive() { } ; } }

4.3 多线程读文件处理去重
package service.jmw.weektwo;

import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import service.jmw.HashSplitTool;

import java.io.*;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 去重步骤
 *
 * @author JMWANG
 */
public class DuplicateRemoval implements Runnable {

    public static void main(String[] args) {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("FileInput", "读取目录");
        jsonObject.put("FileOutput", "去重后存放目录");
        ThreadPoolExecutor threadPoolExecutor = HashSplitTool.poolExecutor;

        try{
            threadPoolExecutor.execute(new DuplicateRemoval(jsonObject));
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            threadPoolExecutor.shutdown();
        }

    }

    JSONObject jsonObject;

    public DuplicateRemoval(JSONObject jsonObject) {
        this.jsonObject = jsonObject;
    }

    @Override
    public void run(){
        double countTime = 0;
        if (jsonObject.size() != 0) {
            String fileInput = jsonObject.getString("FileInput");
            String fileOutput = jsonObject.getString("FileOutput");

            Set<String> set = new HashSet();

            File folder = new File(fileInput);
            File[] files = folder.listFiles();
            for (File file :
                    files) {
                StringBuilder stringBuilder = new StringBuilder();
                if (!file.exists()) {
                    continue;
                }
                List<String> list = null;
                try {
                    list = IOUtils.readLines(new FileInputStream(file), "UTF-8");
                } catch (IOException e) {
                    e.printStackTrace();
                }
                long start = System.currentTimeMillis();
                if (list!=null && list.size()!=0){
                    for (String line: list) {
                        if (StringUtils.isEmpty(line)) {
                            continue;
                        }
                        String[] split = line.split("#");
                        if (split.length!=2){
                            continue;
                        }else {
                            if (set.contains(split[1])) {
                                stringBuilder.append(split[0]+"\n");
                            } else {
                                set.add(split[1]);
                            }
                        }
                        if (stringBuilder.length()>1024 * 1024) {
                            try {
                                writeFile(fileOutput, "objectId.txt", stringBuilder.toString());
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                    if (stringBuilder.length()!=0) {
                        try {
                            writeFile(fileOutput, "objectId.txt", stringBuilder.toString());
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
                set.clear();
                long end = System.currentTimeMillis();
                System.out.println((end - start) / 1000.0 + "s");
                countTime = countTime + (end - start) / 1000.0;

                stringBuilder.delete(0,stringBuilder.length());
            }
        }
        System.out.println(countTime+ "s");
    }

    private void writeFile(String fileOutput, String s, String stringBuilder) throws IOException {
        File file = new File(fileOutput+s);
        BufferedWriter f = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, true), "UTF-8"));

        if (!file.exists()) {
            file.mkdirs();
        }

        try {
            f.write(stringBuilder);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            f.flush();
            f.close();
        }
    }
}


5.处理速度

大约一秒 2W数据,当然前提是大量数据去拆分和去重,在开10个线程的情况下大概20分钟-30分钟可以处理完2.5亿的去重量。






这里有很多可以优化效率和空间的地方,希望大家指出,如有错误欢迎指正

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/langs/719584.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-04-25
下一篇2022-04-25

发表评论

登录后才能评论

评论列表(0条)

    保存