1. 自定义线程池

1.1 示例代码

/**

* 自定义线程池

*

* 优点:可以自定义参数

*

*/

@Test

public void newThreadPoolExecutor() {

ThreadPoolExecutor executor = new ThreadPoolExecutor(

// 核心线程数

3,

// 最大线程数

5,

// 空闲线程最大存活时间

60L,

// 空闲线程最大存活时间单位

TimeUnit.SECONDS,

// 等待队列及大小

new ArrayBlockingQueue<>(100),

// 创建新线程时使用的工厂

Executors.defaultThreadFactory(),

// 当线程池达到最大时的处理策略

// new ThreadPoolExecutor.AbortPolicy() // 抛出RejectedExecutionHandler异常

new ThreadPoolExecutor.CallerRunsPolicy() // 交由调用者的线程执行

// new ThreadPoolExecutor.DiscardOldestPolicy() // 丢掉最早未处理的任务

// new ThreadPoolExecutor.DiscardPolicy() // 丢掉新提交的任务

);

// 总共5个任务

for (int i = 1; i <= 5; i++) {

int taskIndex = i;

executor.execute(() -> {

log.info("线程 " + Thread.currentThread().getName() + " 正在执行任务 " + taskIndex);

// 每个任务耗时1秒

try {

TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e) {

e.printStackTrace();

}

});

}

executor.shutdown();

}

  控制台打印:

20:09:50.032 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-1 正在执行任务 1

20:09:50.032 [pool-1-thread-2] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-2 正在执行任务 2

20:09:50.032 [pool-1-thread-3] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-3 正在执行任务 3

20:09:51.038 [pool-1-thread-2] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-2 正在执行任务 5

20:09:51.038 [pool-1-thread-3] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-3 正在执行任务 4

2. 固定长度线程池

2.1 示例代码

/**

* 固定大小线程池

*

* 优点:当任务执行较快,且任务较少时使用方便

*

*

* 风险:当处理较慢时,等待队列的任务堆积会导致OOM

*

*/

@Test

public void newFixThreadPool() {

// 3个固定线程

ExecutorService executorService = Executors.newFixedThreadPool(3);

// 总共5个任务

for (int i = 1; i <= 5; i++) {

int taskIndex = i;

executorService.execute(() -> {

log.info("线程 " + Thread.currentThread().getName() + " 正在执行任务 " + taskIndex);

// 每个任务耗时1秒

try {

TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e) {

e.printStackTrace();

}

});

}

executorService.shutdown();

}

  控制台打印:

20:16:27.040 [pool-1-thread-2] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-2 正在执行任务 2

20:16:27.040 [pool-1-thread-3] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-3 正在执行任务 3

20:16:27.040 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-1 正在执行任务 1

20:16:28.048 [pool-1-thread-3] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-3 正在执行任务 4

20:16:28.048 [pool-1-thread-2] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-2 正在执行任务 5

  前3个任务被同时执行,因为刚好有3个核心线程。后2个任务会被存放到阻塞队列,当执行前3个任务的某个线程空闲时会从队列中获取任务并执行。

2.2 源码剖析

/**

* Creates a thread pool that reuses a fixed number of threads

* operating off a shared unbounded queue. At any point, at most

* {@code nThreads} threads will be active processing tasks.

* If additional tasks are submitted when all threads are active,

* they will wait in the queue until a thread is available.

* If any thread terminates due to a failure during execution

* prior to shutdown, a new one will take its place if needed to

* execute subsequent tasks. The threads in the pool will exist

* until it is explicitly {@link ExecutorService#shutdown shutdown}.

*

* @param nThreads the number of threads in the pool

* @return the newly created thread pool

* @throws IllegalArgumentException if {@code nThreads <= 0}

*/

public static ExecutorService newFixedThreadPool(int nThreads) {

return new ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue());

}

  该类型线程池的核心线程数和最大线程数为指定的参数,空闲线程的存活线程时间为0毫秒,等待队列使用LinkedBlockingQueue,初始化大小为Integer.MAX_VALUE(即:2147483647)。   当任务执行较慢时,阻塞队列存有大量的任务等待,这些任务会占用大量的内存,从而可能导致OOM。

3. 单一线程池

3.1 示例代码

/**

* 单一线程池

*

* 优势:保存任务按照提交的顺序执行

*

*

* 风险:当处理较慢时,等待队列的任务堆积会导致OOM

*

*/

@Test

public void newSingleThreadExecutor() {

// 1个线程

ExecutorService executor = Executors.newSingleThreadExecutor();

// 总共5个任务

for (int i = 1; i <= 5; i++) {

int taskIndex = i;

executor.execute(() -> {

log.info("线程 " + Thread.currentThread().getName() + " 正在执行任务 " + taskIndex);

// 每个任务耗时1秒

try {

TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e) {

e.printStackTrace();

}

});

}

executor.shutdown();

}

  控制台打印:

20:31:04.970 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-1 正在执行任务 1

20:31:05.974 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-1 正在执行任务 2

20:31:06.974 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-1 正在执行任务 3

20:31:07.975 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-1 正在执行任务 4

20:31:08.976 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-1 正在执行任务 5

  所有任务按照提交的顺序执行。

3.2 源码剖析

/**

* Creates an Executor that uses a single worker thread operating

* off an unbounded queue. (Note however that if this single

* thread terminates due to a failure during execution prior to

* shutdown, a new one will take its place if needed to execute

* subsequent tasks.) Tasks are guaranteed to execute

* sequentially, and no more than one task will be active at any

* given time. Unlike the otherwise equivalent

* {@code newFixedThreadPool(1)} the returned executor is

* guaranteed not to be reconfigurable to use additional threads.

*

* @return the newly created single-threaded Executor

*/

public static ExecutorService newSingleThreadExecutor() {

return new FinalizableDelegatedExecutorService

(new ThreadPoolExecutor(1, 1,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue()));

}

  该类型线程池的核心线程数和最大线程数都为1,空闲线程的存活线程时间为0毫秒,等待队列使用LinkedBlockingQueue,初始化大小为Integer.MAX_VALUE(即:2147483647)。   当任务执行较慢时,阻塞队列存有大量的任务等待,这些任务会占用大量的内存,从而可能导致OOM。

4. 共享线程池

4.1 示例代码

/**

* 共享线程池

*

* 优势:当在某一时间段内任务较多,且执行较快时方便使用

*

*

* 风险:当处理较慢时,会创建大量的线程

*

*/

@Test

public void newCachedThreadPool() {

ExecutorService executor = Executors.newCachedThreadPool();

// 总共5个任务

for (int i = 1; i <= 5; i++) {

int taskIndex = i;

executor.execute(() -> {

log.info("线程 " + Thread.currentThread().getName() + " 正在执行任务 " + taskIndex);

// 每个任务耗时1秒

try {

TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e) {

e.printStackTrace();

}

});

}

executor.shutdown();

}

  控制台打印:

20:45:31.351 [pool-1-thread-4] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-4 正在执行任务 4

20:45:31.351 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-1 正在执行任务 1

20:45:31.351 [pool-1-thread-5] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-5 正在执行任务 5

20:45:31.358 [pool-1-thread-2] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-2 正在执行任务 2

20:45:31.359 [pool-1-thread-3] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-3 正在执行任务 3

  每一个任务都创建了新的线程。

4.2 源码剖析

/**

* Creates a thread pool that creates new threads as needed, but

* will reuse previously constructed threads when they are

* available. These pools will typically improve the performance

* of programs that execute many short-lived asynchronous tasks.

* Calls to {@code execute} will reuse previously constructed

* threads if available. If no existing thread is available, a new

* thread will be created and added to the pool. Threads that have

* not been used for sixty seconds are terminated and removed from

* the cache. Thus, a pool that remains idle for long enough will

* not consume any resources. Note that pools with similar

* properties but different details (for example, timeout parameters)

* may be created using {@link ThreadPoolExecutor} constructors.

*

* @return the newly created thread pool

*/

public static ExecutorService newCachedThreadPool() {

return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

60L, TimeUnit.SECONDS,

new SynchronousQueue());

}

  该类型线程池的核心线程数为0,最大线程数为Integer.MAX_VALUE(即:2147483647),空闲线程最大存活时间为60秒,等待队列使用SynchronousQueue,该队列不存储数据,只做转发,具体可参考:【并发编程】Java 阻塞队列。   当任务较多或执行较慢时,会创建大量的线程,从而导致OOM。

5. 定时线程池

5.1 示例代码

/**

* 定时线程池

*

* 优点:可以定时执行某些任务

*

*

* 风险:当处理较慢时,等待队列的任务堆积会导致OOM

*

*/

@Test

public void newScheduledThreadPool() {

// // 单一线程

// ExecutorService executor = Executors.newSingleThreadScheduledExecutor();

// 指定核心线程数

ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);

executor.schedule(() -> {

log.info("3秒后开始执行,以后不再执行");

// 每个任务耗时1秒

try {

TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e) {

e.printStackTrace();

}

}, 3, TimeUnit.SECONDS);

//

// executor.scheduleAtFixedRate(() -> {

// log.info("3秒后开始执行,以后每2秒执行一次");

//

// // 每个任务耗时1秒

// try {

// TimeUnit.SECONDS.sleep(1);

// } catch (InterruptedException e) {

// e.printStackTrace();

// }

// }, 3, 2, TimeUnit.SECONDS);

//

// executor.scheduleWithFixedDelay(() -> {

// log.info("3秒后开始执行,以后延迟2秒执行一次");

//

// // 每个任务耗时1秒

// try {

// TimeUnit.SECONDS.sleep(1);

// } catch (InterruptedException e) {

// e.printStackTrace();

// }

// }, 3, 2, TimeUnit.SECONDS);

}

  控制台打印 - 1:

21:18:46.494 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 3秒后开始执行,以后不再执行

  启动后3秒开始执行,执行完成后不再继续执行。   控制台打印 - 2:

21:22:47.078 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 3秒后开始执行,以后每2秒执行一次

21:22:49.075 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 3秒后开始执行,以后每2秒执行一次

21:22:51.075 [pool-1-thread-2] INFO com.c3stones.test.ThreadPoolTest - 3秒后开始执行,以后每2秒执行一次

  启动后3秒开始执行,以后每两秒执行一次。   控制台打印 - 3:

21:28:09.701 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 3秒后开始执行,以后延迟2秒执行一次

21:28:12.705 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 3秒后开始执行,以后延迟2秒执行一次

21:28:15.707 [pool-1-thread-2] INFO com.c3stones.test.ThreadPoolTest - 3秒后开始执行,以后延迟2秒执行一次

  启动后3秒开始执行,以后每次执行时间为任务的耗时时间加固定的延迟时间。   假设每次任务固定延迟2秒,第一次任务在第3秒开始执行,任务耗时1秒;第二次任务将在第一次完成后2秒开始执行(即第6秒),耗时2秒;第三次任务将在第二次完成后2秒开始执行(即第10秒),依次类推。

6. SpringBoot中注入异步线程池

6.1 自定义线程配置类

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.core.task.TaskExecutor;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**

* 自定义线程池配置类

*

* @author CL

*/

@Configuration

public class TaskExecutorConfig {

/**

* 自定义任务执行器

*

* @return {@link TaskExecutor}

*/

@Bean

public TaskExecutor taskExecutor() {

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

// 核心线程数,默认1

int corePoolSize = Runtime.getRuntime().availableProcessors();

executor.setCorePoolSize(corePoolSize);

// 最大线程数,默认Integer.MAX_VALUE

executor.setMaxPoolSize(corePoolSize * 2 + 1);

// 空闲线程最大存活时间,默认60秒

executor.setKeepAliveSeconds(3);

// 等待队列及大小,默认Integer.MAX_VALUE

executor.setQueueCapacity(500);

// 线程的名称前缀,默认该Bean名称简写:org.springframework.util.ClassUtils.getShortName(java.lang.Class)

executor.setThreadNamePrefix("custom-thread-");

// 当线程池达到最大时的处理策略,默认抛出RejectedExecutionHandler异常

// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // 抛出RejectedExecutionHandler异常

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 交由调用者的线程执行

// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); // 丢掉最早未处理的任务

// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); // 丢掉新提交的任务

// 等待所有任务结束后再关闭线程池,默认false

executor.setWaitForTasksToCompleteOnShutdown(true);

// 等待所有任务结束最长等待时间,默认0毫秒

executor.setAwaitTerminationSeconds(10);

// 执行初始化

executor.initialize();

return executor;

}

}

在Service注入使用

/**

* 示例Service

*

* @author CL

*/

public interface DemoService {

/**

* 示例方法

*

* @return {@link String}

*/

void demo();

}

import lombok.extern.slf4j.Slf4j;

import org.springframework.core.task.TaskExecutor;

import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**

* 示例Service实现

*

* @author CL

*/

@Slf4j

@Service

public class DemoServiceImpl implements DemoService {

@Resource

private TaskExecutor taskExecutor;

/**

* 示例方法

*/

@Override

public void demo() {

taskExecutor.execute(() -> {

log.info("线程 " + Thread.currentThread().getName() + " 正在执行Service中的方法");

});

}

}

异步任务指定线程池

import lombok.extern.slf4j.Slf4j;

import org.springframework.scheduling.annotation.Async;

import org.springframework.scheduling.annotation.EnableAsync;

import org.springframework.stereotype.Component;

/**

* 示例异步任务

*

* @author CL

*/

@Slf4j

@Component

@EnableAsync

public class DemoAsync {

/**

* 示例方法

*/

@Async(value = "taskExecutor")

public void demo() {

log.info("线程 " + Thread.currentThread().getName() + " 正在执行Async中的方法");

}

}

定时任务调度指定线程池

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.scheduling.TaskScheduler;

import org.springframework.scheduling.annotation.SchedulingConfigurer;

import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

import org.springframework.scheduling.config.ScheduledTaskRegistrar;

import java.util.concurrent.Executors;

import java.util.concurrent.ScheduledExecutorService;

import java.util.concurrent.ThreadPoolExecutor;

/**

* 自定义定时任务调度配置类

*

* @author CL

*/

@Configuration

public class SheduledConfig implements SchedulingConfigurer {

/**

* 配置定时任务

*

* @param scheduledTaskRegistrar 配置任务注册器

*/

@Override

public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {

scheduledTaskRegistrar.setScheduler(taskScheduler());

// // 第二种方式

// scheduledTaskRegistrar.setScheduler(scheduledExecutorService());

}

/**

* 自定义任务调度器

*

* @return {@link TaskScheduler}

*/

@Bean

public TaskScheduler taskScheduler() {

ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();

executor.setPoolSize(5);

executor.setThreadNamePrefix("custom-scheduler-");

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

executor.initialize();

return executor;

}

// /**

// * 自定义任务线程池

// *

// * @return {@link ScheduledExecutorService}

// */

// @Bean

// public ScheduledExecutorService scheduledExecutorService() {

// return Executors.newScheduledThreadPool(5);

// }

}

6.2 测试

编写测试Controller

import com.c3tones.async.DemoAsync;

import com.c3tones.service.DemoService;

import lombok.extern.slf4j.Slf4j;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**

* 示例Controller

*

* @author CL

*/

@Slf4j

@RestController

public class DemoController {

@Resource

private DemoService demoService;

@Resource

private DemoAsync demoAsync;

/**

* Service示例方法

*

* @return {@link String}

*/

@RequestMapping("/service")

public void service() {

log.info("Service示例方法开始执行");

demoService.demo();

log.info("Service示例方法结束执行");

}

/**

* 异步示例方法

*

* @return {@link String}

*/

@RequestMapping("/async")

public void async() {

log.info("异步示例方法开始执行");

demoAsync.demo();

log.info("异步示例方法结束执行");

}

}

启动项目测试Service中的自定义线程池

curl http://127.0.0.1:8080/service

  控制台打印:

2023-03-19 22:26:26.896 INFO 136568 --- [nio-8080-exec-3] com.c3tones.controller.DemoController : Service示例方法开始执行

2023-03-19 22:26:26.897 INFO 136568 --- [nio-8080-exec-3] com.c3tones.controller.DemoController : Service示例方法结束执行

2023-03-19 22:26:26.897 INFO 136568 --- [custom-thread-1] com.c3tones.service.DemoServiceImpl : 线程 custom-thread-1 正在执行Service中的方法

  调用接口同步打印日志,自定义线程异步执行任务。

测试异步任务中的自定义线程池

curl http://127.0.0.1:8080/async

  控制台打印:

2023-03-19 22:28:08.349 INFO 136568 --- [nio-8080-exec-7] com.c3tones.controller.DemoController : 异步示例方法开始执行

2023-03-19 22:28:08.355 INFO 136568 --- [nio-8080-exec-7] com.c3tones.controller.DemoController : 异步示例方法结束执行

2023-03-19 22:28:08.363 INFO 136568 --- [custom-thread-2] com.c3tones.async.DemoAsync : 线程 custom-thread-2 正在执行Async中的方法

  调用接口同步打印日志,异步线程异步执行任务。

测试定时任务中的自定义线程池

编写测试方法 import lombok.extern.slf4j.Slf4j;

import org.springframework.scheduling.annotation.EnableScheduling;

import org.springframework.scheduling.annotation.Scheduled;

import org.springframework.stereotype.Component;

/**

* 示例定时任务

*

* @author CL

*/

@Slf4j

@Component

@EnableScheduling

public class DemoScheduled {

/**

* 示例方法

*/

@Scheduled(cron = "0/3 * * * * ? ")

public void demo() {

log.info("线程 " + Thread.currentThread().getName() + " 正在执行Scheduled中的方法");

}

}

启动服务   控制台打印: 2023-03-19 22:30:24.002 INFO 136568 --- [tom-scheduler-3] com.c3tones.sheduled.DemoScheduled : 线程 custom-scheduler-3 正在执行Scheduled中的方法

2023-03-19 22:30:27.002 INFO 136568 --- [tom-scheduler-3] com.c3tones.sheduled.DemoScheduled : 线程 custom-scheduler-3 正在执行Scheduled中的方法

2023-03-19 22:30:30.001 INFO 136568 --- [tom-scheduler-3] com.c3tones.sheduled.DemoScheduled : 线程 custom-scheduler-3 正在执行Scheduled中的方法

  定时任务从0秒开始,每3秒执行一次任务。

7. 项目地址

  thread-demo

推荐阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: