线程池优化百万数据插入数据库

构建User

1
2
3
4
5
6
7
private User buildUser(int i){
return User.builder()
.openid(""+i)
.name("user_"+i)
.sex("1")
.build();
}

最初版本单线程池

耗时29s

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 单线程线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

// 用AtomicInteger保证线程安全(原子操作+可见性)
private final AtomicInteger count = new AtomicInteger(0);
private class ThreadHandler implements Runnable {
@Override
public void run() {
//指定ArrayList初始容量(1000),减少扩容开销
List<User> users = new ArrayList<>(10000);
try {
for (int i = 0; i < 1000000; i++) {
users.add(buildUser(i));
count.incrementAndGet(); // 原子自增,线程安全

if (i % 10000 == 0) {
userService.saveBatch(users);
users.clear();
}
}
// 处理最后不足数据
if (!users.isEmpty()) {
userService.saveBatch(users);
}
} catch (Exception e) {
//捕获异常,避免任务无声中断
System.err.println("插入失败:" + e.getMessage());
e.printStackTrace();
}
}
}

@Test
public void testInsertBatch() throws Exception {
long startTime = System.currentTimeMillis();

// 用Future获取任务执行结果,阻塞等待任务完成
Future<?> future = SECKILL_ORDER_EXECUTOR.submit(new ThreadHandler());
future.get(); // 关键:等待子线程任务执行完毕

long endTime = System.currentTimeMillis();

// 此时count已确保是最终值(因为future.get()已等待任务完成)
System.out.println("总耗时:" + (endTime - startTime) + "ms,插入总数:" + count.get());

//关闭线程池,避免资源泄漏
SECKILL_ORDER_EXECUTOR.shutdown();
}

优化:线程池改造自定义线程池 总耗时:11429ms,插入总数:1000000

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 优化1:自定义线程池(IO密集型任务,线程数=CPU核心数*2)
private static final int CPU_CORES = Runtime.getRuntime().availableProcessors();
private static final ExecutorService CUSTOM_EXECUTOR = new ThreadPoolExecutor(
CPU_CORES, // 核心线程数(CPU核心数,减少上下文切换)
CPU_CORES * 2, // 最大线程数(IO密集型可适当增加)
60L, TimeUnit.SECONDS, // 空闲线程存活时间
new ArrayBlockingQueue<>(100), // 有界队列(避免无界队列OOM)
new ThreadFactory() { // 自定义线程工厂(便于调试)
private final AtomicInteger threadNum = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "batch-insert-thread-" + threadNum.getAndIncrement());
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略:提交者执行(缓解压力)
);

数据分片

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// 用AtomicInteger保证线程安全(原子操作+可见性)
private final AtomicInteger totalCount = new AtomicInteger(0);
// 每批插入数量(根据数据库性能调整)
private static final int BATCH_SIZE = 5000;
// 总数据量
private static final int TOTAL_DATA = 1000000;

// 数据分片任务
class InsertTask implements Runnable {
private final int start; // 分片起始索引
private final int end; // 分片结束索引

public InsertTask(int start, int end) {
this.start = start;
this.end = end;
}

@Override
// 事务内批量插入(减少事务提交开销) 在run方法外面减少事务提交次数
@Transactional
public void run() {
List<User> batchList = new ArrayList<>(BATCH_SIZE); // 预分配容量
int localCount = 0; // 线程内计数(减少原子操作频率)

try {
String namePrefix = "user_"; // 预定义前缀,减少对象创建
for (int i = start; i < end; i++) {
// 直接使用构造方法创建对象,减少setter开销
batchList.add(new User(namePrefix+i));
localCount++;

// 达到批次大小则插入
if (batchList.size() >= BATCH_SIZE) {
saveBatch(batchList);
batchList.clear();
}
}
// 处理剩余数据
if (!batchList.isEmpty()) {
saveBatch(batchList);
}
// 最后统一更新总计数(减少AtomicInteger操作次数)
totalCount.addAndGet(localCount);
} catch (Exception e) {
System.err.println("线程" + Thread.currentThread().getName() + "插入失败:" + e.getMessage());
}
}
protected void saveBatch(List<User> users) {
userService.saveBatch(users, BATCH_SIZE); // 指定批次大小适配框架
}
}

@Test
public void testBatchInsertWithCustomPool() throws Exception {

long startTime = System.currentTimeMillis();
List<Future<?>> futures = new ArrayList<>();

// 优化2:数据分片(每个线程处理约5-10万条,减少线程调度开销)
int perThreadData = 100000; // 每个线程处理10万条
int threadCount = (int) Math.ceil((double) TOTAL_DATA / perThreadData);

for (int i = 0; i < threadCount; i++) {
int start = i * perThreadData;
int end = Math.min((i + 1) * perThreadData, TOTAL_DATA);
futures.add(CUSTOM_EXECUTOR.submit(new InsertTask(start, end)));
}

// 等待所有任务完成
for (Future<?> future : futures) {
future.get();
}

// 关闭线程池
CUSTOM_EXECUTOR.shutdown();
// 等待线程池完全关闭(最多等1分钟)
CUSTOM_EXECUTOR.awaitTermination(1, TimeUnit.MINUTES);

long endTime = System.currentTimeMillis();
System.out.println("总耗时:" + (endTime - startTime) + "ms,插入总数:" + totalCount.get());
}

加入jvm预热看看 总耗时:10142ms,插入总数:1000000

优化了一秒

1
2
3
4
5
6
7
8
// JVM预热方法
private void warmUp() {
List<User> warmUpList = new ArrayList<>(100);
for (int i = 0; i < 100; i++) {
warmUpList.add(new User("warmup_" + i));
}
userService.saveBatch(warmUpList);
}

总结:一开始啊,用单线程处理百万条数据,慢悠悠地跑了 29 秒,纯属浪费多核 CPU 的力气。后来改了改,搞了个自定义线程池,按 CPU 核心数分配线程,把 100 万条数据切成一块一块的,让多个线程同时处理,这一下就快多了,直接降到 11 秒左右。中间还琢磨了些细节,比如计数的时候不让线程频繁抢着算,而是各自记完最后一起汇总;列表和字符串也提前准备好,少搞些重复创建的活儿。数据库那边也调了调,让每个线程处理的数据都放一个事务里提交,少让数据库来回跑 IO。最后还加了个 “热身” 步骤,让系统先跑点数据进入状态,这么一折腾,耗时就压到 10 秒了,比原来快了快两倍呢。

为什么只快了两倍:因为插入数据库是IO操作,在磁盘,处理事务,维护索引。这些操作中,并不是CPU单独干活,所以开多个核心数,也只是加快了CPU处理工作的效率。