前言:在编程中我们为什么要使用线程池,线程池中的线程是怎么执行任务的,线程池中的线程是如何复用和销毁的;

1 什么是线程池: 提前创建一些线程放到一个地方,使用的时候直接获取,避免频繁的创建和销毁线程,节省内存和CPU资源; 2 Java 中已有的线程池:

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);

ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();

ExecutorService cashedThreadPool = Executors.newCachedThreadPool();

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);

3 一个线程池构建需要的参数: 我们的任务是由某一个线程具体去执行的,所以我们就要定义好池中线程的数量,并且当任务数量增加时,可以开辟一些临时线程进行任务处理,当池中的线程已经是多余的时候再回收掉一些线程来节约资源;当池中的线程都有任务执行,这个时候来了新的任务,需要有个地方能把这些任务先行储存,以便当有空余的线程时在执行存储下来的任务,而且考虑到资源的情况这个存储任务的队列也最好是有限量的,如果超出了程序的处理能力,使用者可以自己决定拒绝策略; 所以在创建线程池的时候有必要以下参数:

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue workQueue,

ThreadFactory threadFactory,

RejectedExecutionHandler handler) {

if (corePoolSize < 0 ||

maximumPoolSize <= 0 ||

maximumPoolSize < corePoolSize ||

keepAliveTime < 0)

throw new IllegalArgumentException();

if (workQueue == null || threadFactory == null || handler == null)

throw new NullPointerException();

this.corePoolSize = corePoolSize;

this.maximumPoolSize = maximumPoolSize;

this.workQueue = workQueue;

this.keepAliveTime = unit.toNanos(keepAliveTime);

this.threadFactory = threadFactory;

this.handler = handler;

corePoolSize: 正式存在的线程数; maximumPoolSize:允许存在的最大线程,扩容的线程数= maximumPoolSize-corePoolSize keepAliveTime:临时线程存活的时间 Unit:临时线程存活的时间单位 workQueue:阻塞队列 threadFactory:线程工厂 Handler:拒绝策略;

4 线程池任务的执行和线程销毁: demo:

ExecutorService executorService = Executors.newFixedThreadPool(2);

executorService.execute(new Thread( () -> testGetFutureMap("param")));

private static Map testGetFutureMap(String param) {

// 处理业务逻辑

Map mapData = new HashMap<>();

/**

* do some thing

*/

System.out.printf("do some thing");

return mapData;

}

ThreadPoolExecutor:execute 线程任务的执行

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

int c = ctl.get();

if (workerCountOf(c) < corePoolSize) {

// 当前工作的线程小于核心线程,直接新建线程,并且进行任务的执行

if (addWorker(command, true))

return;

c = ctl.get();

}

// 当前工作的线程大于核心线程,或者直接添加任务失败

if (isRunning(c) && workQueue.offer(command)) {

// 添加到队列中,等待后续空闲线程执行任务

int recheck = ctl.get();

if (! isRunning(recheck) && remove(command))

reject(command);

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}

// 当前工作的线程大于核心线程,或者直接添加任务失败,并且添加队列失败

// 开启临时线程进行任务处理

else if (!addWorker(command, false))

// 如果失败执行拒绝策略

reject(command);

}

addWorker:线程任务的执行

private boolean addWorker(Runnable firstTask, boolean core) {

retry:

for (;;) {

int c = ctl.get();

int rs = runStateOf(c);

// Check if queue empty only if necessary.

// 判断当前线程池是否可用

if (rs >= SHUTDOWN &&

! (rs == SHUTDOWN &&

firstTask == null &&

! workQueue.isEmpty()))

return false;

for (;;) {

int wc = workerCountOf(c);

// 判断当前工作线程数和核心线程数或者最大线程数

if (wc >= CAPACITY ||

wc >= (core ? corePoolSize : maximumPoolSize))

return false;

// 增加工作线程数,增加成功,直接跳出for 循环

if (compareAndIncrementWorkerCount(c))

break retry;

c = ctl.get(); // Re-read ctl

// 增加线程数失败,进行重试

if (runStateOf(c) != rs)

continue retry;

// else CAS failed due to workerCount change; retry inner loop

}

}

boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

try {

// 创建线程,传入任务

// 执行任务 new Thread(new Worker()).start();

w = new Worker(firstTask);

final Thread t = w.thread;

if (t != null) {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

// Recheck while holding lock.

// Back out on ThreadFactory failure or if

// shut down before lock acquired.

int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||

(rs == SHUTDOWN && firstTask == null)) {

if (t.isAlive()) // precheck that t is startable

throw new IllegalThreadStateException();

// 添加任务

workers.add(w);

int s = workers.size();

if (s > largestPoolSize)

largestPoolSize = s;

workerAdded = true;

}

} finally {

mainLock.unlock();

}

if (workerAdded) {

// 启动线程执行任务

t.start();

workerStarted = true;

}

}

} finally {

if (! workerStarted)

// 添加任务失败,工作任务数量-1,移除任务

addWorkerFailed(w);

}

return workerStarted;

}

compareAndIncrementWorkerCount:增加线程池中的线程数量

private boolean compareAndIncrementWorkerCount(int expect) {

// 工作线程数量+1

return ctl.compareAndSet(expect, expect + 1);

}

构建Worker用于具体任务的执行:

Worker(Runnable firstTask) {

setState(-1); // inhibit interrupts until runWorker

this.firstTask = firstTask;

this.thread = getThreadFactory().newThread(this);

}

执行任务 t.start() ,调用Worker run():

public void run() {

runWorker(this);

}

finalvoid runWorker(Worker w) {

Thread wt = Thread.currentThread();

Runnable task = w.firstTask;

w.firstTask = null;

w.unlock(); // allow interrupts

boolean completedAbruptly = true;

try {

while (task != null || (task = getTask()) != null) {

// 任务执行,没有任务的时候不进入while 循环

w.lock();

// If pool is stopping, ensure thread is interrupted;

// if not, ensure thread is not interrupted. This

// requires a recheck in second case to deal with

// shutdownNow race while clearing interrupt

// 线程不可用

if ((runStateAtLeast(ctl.get(), STOP) ||

(Thread.interrupted() &&

runStateAtLeast(ctl.get(), STOP))) &&

!wt.isInterrupted())

wt.interrupt();

try {

// 在任务执行之前,此方法可以被重写

beforeExecute(wt, task);

Throwable thrown = null;

try {

// 执行任务

task.run();

} catch (RuntimeException x) {

thrown = x; throw x;

} catch (Error x) {

thrown = x; throw x;

} catch (Throwable x) {

thrown = x; throw new Error(x);

} finally {

// 任务执行以后

afterExecute(task, thrown);

}

} finally {

// 执行任务后

task = null;

w.completedTasks++;

w.unlock();

}

}

completedAbruptly = false;

} finally {

// 线程的退出,减少工作线程数量

processWorkerExit(w, completedAbruptly);

}

}

获取任务 getTask():

private Runnable getTask() {

boolean timedOut = false; // Did the last poll() time out?

for (;;) {

int c = ctl.get();

int rs = runStateOf(c);

// Check if queue empty only if necessary.

// 线程池不可用

if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

decrementWorkerCount();

return null;

}

int wc = workerCountOf(c);

// Are workers subject to culling?

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))

&& (wc > 1 || workQueue.isEmpty())) {

// 当前工作线程大于核心线程,并且没有了任务,则将线程池中线程数量-1

if (compareAndDecrementWorkerCount(c))

return null;

continue;

}

try {

// 当前工作线程大于核心线程数 则进行规定时间内获取任务(规定时间内没有获取到则说明当前没有需要执行的任务)

// 否则直接获取任务

Runnable r = timed ?

workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

workQueue.take();

if (r != null)

return r;// 返回任务

timedOut = true;

} catch (InterruptedException retry) {

timedOut = false;

}

}

}

任务执行完毕后退出线程,processWorkerExit:

private void processWorkerExit(Worker w, boolean completedAbruptly) {

if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted

decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

// 增加任务完成的数量

completedTaskCount += w.completedTasks;

// 移除任务

workers.remove(w);

} finally {

mainLock.unlock();

}

tryTerminate();

int c = ctl.get();

if (runStateLessThan(c, STOP)) {

if (!completedAbruptly) {

int min = allowCoreThreadTimeOut ? 0 : corePoolSize;

if (min == 0 && ! workQueue.isEmpty())

min = 1;

if (workerCountOf(c) >= min)

return; // replacement not needed

}

addWorker(null, false);

}

}

以上为线程池中线程创建,执行任务,以及销毁线程的过程,流程图如下: 过程: (1)当提交任务后,如果当前工作的线程没有超过核心线程,则创建线程然后进行任务的执行; (2)当工作的现车超过核心线程数,则尝试添加到阻塞队列中,添加成功后,有空闲线程时从队列中获取任务并执行; (3)如果阻塞队列已满,则判断是否要增加临时线程处理任务,如果已经达到最大线程数,则执行拒绝策略;否则创建临时线程,执行任务; (4)当任务执行完毕后,如果没有了任务,并且当前工作的线程大于核心小程,则执行线程的销毁;

5 总结: 5.1 线程池的创建是为了避免线程频繁的创建和销毁,是为了线程的复用,增加线程池中的线程可以提高任务执行的效率,但是线程池中线程过多会造成频繁的上下文切换,所以线程数量并不是越多越好; 5.2 我们可以自定义线程池,通过重写方法的方式,更好的监控线程的执行:

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

public class ExecutorServiceMonitor extends ThreadPoolExecutor {

public ExecutorServiceMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) {

super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);

}

// 线程执行任务之前

@Override

protected void beforeExecute(Thread t, Runnable r) {

super.beforeExecute(t, r);

}

// 线程执行任务之后

@Override

protected void afterExecute(Runnable r, Throwable t) {

super.afterExecute(r, t);

}

}

文章链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: