线程池优化百万数据插入数据库
构建User
1 2 3 4 5 6 7
| private User buildUser(int i){ return User.builder() .openid(""+i) .name("user_"+i) .sex("1") .build(); }
|
最初版本单线程池
耗时29s
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
private final AtomicInteger count = new AtomicInteger(0); private class ThreadHandler implements Runnable { @Override public void run() { List<User> users = new ArrayList<>(10000); try { for (int i = 0; i < 1000000; i++) { users.add(buildUser(i)); count.incrementAndGet();
if (i % 10000 == 0) { userService.saveBatch(users); users.clear(); } } if (!users.isEmpty()) { userService.saveBatch(users); } } catch (Exception e) { System.err.println("插入失败:" + e.getMessage()); e.printStackTrace(); } } }
@Test public void testInsertBatch() throws Exception { long startTime = System.currentTimeMillis();
Future<?> future = SECKILL_ORDER_EXECUTOR.submit(new ThreadHandler()); future.get();
long endTime = System.currentTimeMillis();
System.out.println("总耗时:" + (endTime - startTime) + "ms,插入总数:" + count.get());
SECKILL_ORDER_EXECUTOR.shutdown(); }
|
优化:线程池改造自定义线程池 总耗时:11429ms,插入总数:1000000
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| private static final int CPU_CORES = Runtime.getRuntime().availableProcessors(); private static final ExecutorService CUSTOM_EXECUTOR = new ThreadPoolExecutor( CPU_CORES, CPU_CORES * 2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new ThreadFactory() { private final AtomicInteger threadNum = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { return new Thread(r, "batch-insert-thread-" + threadNum.getAndIncrement()); } }, new ThreadPoolExecutor.CallerRunsPolicy() );
|
数据分片
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
| private final AtomicInteger totalCount = new AtomicInteger(0);
private static final int BATCH_SIZE = 5000;
private static final int TOTAL_DATA = 1000000;
class InsertTask implements Runnable { private final int start; private final int end;
public InsertTask(int start, int end) { this.start = start; this.end = end; }
@Override @Transactional public void run() { List<User> batchList = new ArrayList<>(BATCH_SIZE); int localCount = 0;
try { String namePrefix = "user_"; for (int i = start; i < end; i++) { batchList.add(new User(namePrefix+i)); localCount++;
if (batchList.size() >= BATCH_SIZE) { saveBatch(batchList); batchList.clear(); } } if (!batchList.isEmpty()) { saveBatch(batchList); } totalCount.addAndGet(localCount); } catch (Exception e) { System.err.println("线程" + Thread.currentThread().getName() + "插入失败:" + e.getMessage()); } } protected void saveBatch(List<User> users) { userService.saveBatch(users, BATCH_SIZE); } }
@Test public void testBatchInsertWithCustomPool() throws Exception {
long startTime = System.currentTimeMillis(); List<Future<?>> futures = new ArrayList<>();
int perThreadData = 100000; int threadCount = (int) Math.ceil((double) TOTAL_DATA / perThreadData);
for (int i = 0; i < threadCount; i++) { int start = i * perThreadData; int end = Math.min((i + 1) * perThreadData, TOTAL_DATA); futures.add(CUSTOM_EXECUTOR.submit(new InsertTask(start, end))); }
for (Future<?> future : futures) { future.get(); }
CUSTOM_EXECUTOR.shutdown(); CUSTOM_EXECUTOR.awaitTermination(1, TimeUnit.MINUTES);
long endTime = System.currentTimeMillis(); System.out.println("总耗时:" + (endTime - startTime) + "ms,插入总数:" + totalCount.get()); }
|
加入jvm预热看看 总耗时:10142ms,插入总数:1000000
优化了一秒
1 2 3 4 5 6 7 8
| private void warmUp() { List<User> warmUpList = new ArrayList<>(100); for (int i = 0; i < 100; i++) { warmUpList.add(new User("warmup_" + i)); } userService.saveBatch(warmUpList); }
|
总结:一开始啊,用单线程处理百万条数据,慢悠悠地跑了 29 秒,纯属浪费多核 CPU 的力气。后来改了改,搞了个自定义线程池,按 CPU 核心数分配线程,把 100 万条数据切成一块一块的,让多个线程同时处理,这一下就快多了,直接降到 11 秒左右。中间还琢磨了些细节,比如计数的时候不让线程频繁抢着算,而是各自记完最后一起汇总;列表和字符串也提前准备好,少搞些重复创建的活儿。数据库那边也调了调,让每个线程处理的数据都放一个事务里提交,少让数据库来回跑 IO。最后还加了个 “热身” 步骤,让系统先跑点数据进入状态,这么一折腾,耗时就压到 10 秒了,比原来快了快两倍呢。
为什么只快了两倍:因为插入数据库是IO操作,在磁盘,处理事务,维护索引。这些操作中,并不是CPU单独干活,所以开多个核心数,也只是加快了CPU处理工作的效率。