1 需求 在项目开发中需要处理100万多的数据,这些数据需要从mysql数据库中读取出来,再通过调用其他平台的接口推送数据。由于时间紧迫,数据需要在短时间内完成推送,采用单线程推送很慢,所以采用多线程推送来提高效率。
2 配置多线程 2.1 application.yml
thread-pool:
core-pool-size: 4
max-pool-size: 16
queue-capacity: 80
keep-alive-seconds: 120
2.2 创建ThreadPoolProperties
import lombok.Data;
import org.springframework.stereotype.Component;
import org.springframework.boot.context.properties.ConfigurationProperties;
@Data
@Component
@ConfigurationProperties(prefix = "thread-pool")
public class ThreadPoolProperties {
/**
* 线程池创建时候初始化的线程数
*/
private int corePoolSize;
/**
* 线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
*/
private int maxPoolSize;
/**
* 用来缓冲执行任务的队列
*/
private int queueCapacity;
/**
* 允许线程的空闲时间:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
*/
private int keepAliveSeconds;
}
2.3 创建ThreadPoolConfig
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@EnableAsync
@Configuration
public class ThreadPoolConfig {
private final ThreadPoolProperties threadPoolProperties;
@Autowired
public ThreadPoolConfig(ThreadPoolProperties threadPoolProperties) {
this.threadPoolProperties = threadPoolProperties;
}
@Bean(name = "threadPoolTaskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(threadPoolProperties.getCorePoolSize());
executor.setMaxPoolSize(threadPoolProperties.getMaxPoolSize());
executor.setQueueCapacity(threadPoolProperties.getQueueCapacity());
executor.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds());
executor.setThreadNamePrefix("thread-pool-");
return executor;
}
}
3 多线程批量数据处理
public RequestResult multiThreadPush() {
List
// 分割集合
List> partitionData = partitionData(historyStudentList, 4);
ThreadPoolTaskExecutor executor = SpringUtil.getBean("threadPoolTaskExecutor", ThreadPoolTaskExecutor.class);
// 计数器
CountDownLatch latch = new CountDownLatch(partitionData.size());
for (List
executor.execute(() -> {
try {
for (HistoryStudent historyStudent : historyStudents) {
// 单个数据处理
//processSingleData(historyStudent);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown();
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
return RequestResult.success();
}
private List> partitionData(List
List> partitions = new ArrayList<>();
int size = dataList.size();
int batchSize = size / partitionSize;
for (int i = 0; i < partitionSize; i++) {
int fromIndex = i * batchSize;
int toIndex = (i == partitionSize - 1) ? size : fromIndex + batchSize;
partitions.add(dataList.subList(fromIndex, toIndex));
}
return partitions;
}
4 参考博客 Java多线程批量处理、线程池的使用 Java多线程处理大批量数据 java多线程批量处理数据
推荐链接
发表评论