目录

一、线程池的概念

二、线程池的工作流程

(1)线程参数 

(2)拒绝策略

(3)线程池的工作流程

(4)线程池的参数设置

三、标准库中的线程池

(1)Executors 创建线程池的几种方式

四、线程池的实现

 

一、线程池的概念

        一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 

        场景举例说明:线程池就像一个空壳公司,没有雇佣任何人。每当有任务时,起一个线程进行处理模式。就类似于每当有业务到来,就随机找到一个人让他完成业务,当他完成后就将他解雇。但是很快公司老板发现了一个问题,就是每次招聘和解雇员工成本是很高的。所以他指定了一个指标,公司业务人员会扩招到4个人,同时会随着业务扩大逐渐增大雇的人数。于是当再有业务来时,如果现在公司还没 4个人,就雇一个人去送快递。如果现在公司已经有了4个人,就把业务放到一个本上记录下来,等到4个快递人员空闲的时候去处理。这个就是线程池的模式。

        线程池最大的好处就是减少每次启动、销毁线程的损耗。

二、线程池的工作流程

这里提供一个网址链接,可以查看Java包的详细实现代码:Java Platform SE 8

        标准库线程池在javaJUC(java.util.concurrent)这个包中。

        关于线程池参数和线程池工作流程问题,面试中会经常问到。 

 

(1)线程参数 

线程池参数:

        如果成立了一个公司,公司里面有正式工和临时工。正式工:不能随便开除,开除要有赔偿。而临时工:在业务很多时招聘,没有任务时,就解雇。公司优先选择正式工用,当正式工加班也干不完时,就会招聘临时工。

1.corePoolSize:核心线程数。代表常备线程数。即正式工的数量。

2.maximumPoolSize:最大线程数。代表常备线程数+临时线程数。正式工+临时工的数量

3.keepAliveTime:线程空闲时间。临时工不可以干完活就解雇,公司也会观察一段时间,比如最近一周,都没有什么较多的任务,这些临时工就会被解雇。

4.TimeUnit unit:时间单位。一般会用秒和毫秒。

5.workQueue:工作队列。没有临时工的时候,任务太多了,正式工做不完,就把任务存储在工作队列中。

6.ThreadFactory threadFactory:线程工厂。用线什么去创建线程。比如线程的命名。一般使用默认的线程工厂就可以。

7.RejectedExecutionHandler handler:拒绝策略。任务太多的时候,就要把任务记录到本子上,当本子也记录不下的时候,就会执行拒绝策略。

 

(2)拒绝策略

拒绝策略的四种方式:

方式一:拒绝任务,并抛弃。领导分配了很多任务,任务1/2/3/4。当任务4来了的时候直接拒接,并和领导吵了一架。

方式二:由调用者来执行。领导分配了很多任务,任务1/2/3/4。当任务4来了的时候拒绝任务,让领导来做。

方式三:把新任务加入,丢弃最早的任务。领导分配了很多任务,任务1/2/3/4。当任务4来了的时候可以接受任务4,拒绝任务1。

方式四:直接丢弃,什么也不做。领导分配了很多任务,任务1/2/3/4。当任务4来了的时候一个也不做。

(3)线程池的工作流程

1.最开始的时候,线程池是空的。

2.随着任务的提交,开始创建线程。

1)if(当前线程数

2)if(当前线程数==corePoolSize核心线程数),把任务添加到工作队列中。

3)当队列满了,if(当前线程数

4)当队列满了,if(当前线程数==maximumPoolSize最大线程数),执行拒绝策略。

3.随着任务执行,剩余任务逐渐减少。逐渐有了空闲的线程。

if(空闲时间>keepAliveTime线程空闲时间&&当前线程数>corePoolSize核心线程数),销毁线程,一直到当前线程数==corePoolSize核心线程数

(4)线程池的参数设置

        线程池的参数设计也是面试者常遇见的问题。网上和参考书上都给了很多建议,具体需要根据项目来定。一般可以分为CPU密集型(大多数时间,任务需要CPU执行。任务需要大量CPU参与运算)和IO密集型(当前服务,去等待其他服务返回)。例如:大数据团队,很多工作就是跑SQL,数据的清洗、分析等等。跑一次SQL,可能需要一个小时(在这期间除了等待数据的返回。什么也不能做)

        CPU密集型:线程数=CPU的核数或者线程数=CPU的核数+1

        IO密集型:线程数=CPU的核数*(CPU等待时间/CPU执行时间)或者线程数=2*CPU的核数+1

        以上这些都是理论,实际上以压测为准(压力测试)。一般项目都要需要进行压测,了解服务的一个瓶颈,最多能承担多大流量。

        QA是对整个项目负责的,测试时,也是站在整个项目的角度去测试的,一般关注接口响应时间(我们使用多线程的原因,是为了提高接口的响应时间)。CPU负载情况、能支持的流量

三、标准库中的线程池

(1)Executors 创建线程池的几种方式

常见的线程池       1.newFixedThreadPool: 创建固定线程数的线程池。核心线程数=最大线程数。没有限制。创建一个固定数量的线程池,工作中使用频率最高。

2.newCachedThreadPool: 创建线程数目动态增长的线程池。核心线程数=0。创建一个自动扩容的线程池,适用于并发不固定的短期小任务。

3.newSingleThreadExecutor: 创建只包含单个线程的线程池。核心线程数=最大线程数=1。创建一个只有单个线程的线程池。例如:从A地到B地,需要运送货物,每次运货需要2天,但是运的货并不着急,就可以创建一个线程,专门用来运货。

4.newScheduledThreadPool: 设定延迟时间后执行命令,或者定期执行命令。是进阶版的 Timer。Timer如果中间一个任务异常了,整个任务就会异常。Timer是一个线程在执行,如果上一个任务执行的过长,就会导致下一个无人延期执行。

Executors 本质上是 ThreadPoolExecutor 类的封装。ThreadPoolExecutor 提供了更多的可选参数, 可以进一步细化线程池行为的设定。

 

1.使用 Executors.newFixedThreadPool(10) 能创建出固定包含 10 个线程的线程池。 2.返回值类型为 ExecutorService。 3.通过 ExecutorService.submit 可以注册一个任务到线程池中。

创建代码:

ExecutorService pool = Executors.newFixedThreadPool(10);

pool.submit(new Runnable() {

@Override

public void run() {

System.out.println("hello");

}

});

如果这些线程池,不能满足需求的话就使用newThreadPoolExector来创建。

public class Demo {

public static void main(String[] args) {

//可以自己创建线程池

Executor executor = new ThreadPoolExecutor(3,100,1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

// //jdk 还提供了一些其他的创建方式

// ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);

//

// executorService.schedule(()->{

// System.out.println("hello");

// },1000,TimeUnit.MILLISECONDS);

ExecutorService executorService = Executors.newFixedThreadPool(3);

for (int i = 0; i < 10; i++) {//放了10个任务

executorService.submit(()->{

System.out.println("hello" + new Date());

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

});

}

}

}

四、线程池的实现

1.核心操作为 submit, 将任务加入线程池中。 2.使用 Worker 类描述一个工作线程. 使用 Runnable 描述一个任务。 3.使用一个 BlockingQueue 组织所有的任务。 4.每个 worker 线程要做的事情: 不停的从 BlockingQueue 中取任务并执行。 5.指定一下线程池中的最大线程数 maxWorkerCount;当当前线程数超过这个最大值时,就不再新增线程了。  

class Worker extends Thread {

private LinkedBlockingQueue queue = null

public Worker(LinkedBlockingQueue queue)

super("worker");

this.queue = queue;

}

@Override

public void run() {

// try 必须放在 while 外头, 或者 while 里头应该影响

try {

while (!Thread.interrupted()) {

Runnable runnable = queue.take();

runnable.run();

}

} catch (InterruptedException e) {

}

}

}

public class MyThreadPool {

private int maxWorkerCount = 10;

private LinkedBlockingQueue queue = new LinkedBlockingQueue();

public void submit(Runnable command) {

if (workerList.size() < maxWorkerCount) {

// 当前 worker 数不足, 就继续创建 worker

Worker worker = new Worker(queue);

worker.start();

}

// 将任务添加到任务队列中

queue.put(command);

}

public static void main(String[] args) throws InterruptedException {

MyThreadPool myThreadPool = new MyThreadPool();

myThreadPool.execute(new Runnable() {

@Override

public void run() {

System.out.println("吃饭");

}

});

Thread.sleep(1000);

}

}

 

 

相关文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: