1 需求 在项目开发中需要处理100万多的数据,这些数据需要从mysql数据库中读取出来,再通过调用其他平台的接口推送数据。由于时间紧迫,数据需要在短时间内完成推送,采用单线程推送很慢,所以采用多线程推送来提高效率。

2 配置多线程 2.1 application.yml

thread-pool:

core-pool-size: 4

max-pool-size: 16

queue-capacity: 80

keep-alive-seconds: 120

2.2 创建ThreadPoolProperties

import lombok.Data;

import org.springframework.stereotype.Component;

import org.springframework.boot.context.properties.ConfigurationProperties;

@Data

@Component

@ConfigurationProperties(prefix = "thread-pool")

public class ThreadPoolProperties {

/**

* 线程池创建时候初始化的线程数

*/

private int corePoolSize;

/**

* 线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程

*/

private int maxPoolSize;

/**

* 用来缓冲执行任务的队列

*/

private int queueCapacity;

/**

* 允许线程的空闲时间:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁

*/

private int keepAliveSeconds;

}

2.3 创建ThreadPoolConfig

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.scheduling.annotation.EnableAsync;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@EnableAsync

@Configuration

public class ThreadPoolConfig {

private final ThreadPoolProperties threadPoolProperties;

@Autowired

public ThreadPoolConfig(ThreadPoolProperties threadPoolProperties) {

this.threadPoolProperties = threadPoolProperties;

}

@Bean(name = "threadPoolTaskExecutor")

public ThreadPoolTaskExecutor threadPoolTaskExecutor() {

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

executor.setCorePoolSize(threadPoolProperties.getCorePoolSize());

executor.setMaxPoolSize(threadPoolProperties.getMaxPoolSize());

executor.setQueueCapacity(threadPoolProperties.getQueueCapacity());

executor.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds());

executor.setThreadNamePrefix("thread-pool-");

return executor;

}

}

3 多线程批量数据处理

public RequestResult multiThreadPush() {

List historyStudentList = historyStudentMapper.getList(0, 65867);

// 分割集合

List> partitionData = partitionData(historyStudentList, 4);

ThreadPoolTaskExecutor executor = SpringUtil.getBean("threadPoolTaskExecutor", ThreadPoolTaskExecutor.class);

// 计数器

CountDownLatch latch = new CountDownLatch(partitionData.size());

for (List historyStudents : partitionData) {

executor.execute(() -> {

try {

for (HistoryStudent historyStudent : historyStudents) {

// 单个数据处理

//processSingleData(historyStudent);

}

} catch (Exception e) {

e.printStackTrace();

} finally {

latch.countDown();

}

});

}

try {

latch.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

return RequestResult.success();

}

private List> partitionData(List dataList, int partitionSize) {

List> partitions = new ArrayList<>();

int size = dataList.size();

int batchSize = size / partitionSize;

for (int i = 0; i < partitionSize; i++) {

int fromIndex = i * batchSize;

int toIndex = (i == partitionSize - 1) ? size : fromIndex + batchSize;

partitions.add(dataList.subList(fromIndex, toIndex));

}

return partitions;

}

4 参考博客 Java多线程批量处理、线程池的使用 Java多线程处理大批量数据 java多线程批量处理数据

推荐链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: