暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

ThreadPoolExecutor源码解析

魔都码农 2019-12-11
357

源码基于JDK8

问题

  • 线程与进程

    • 线程:CPU调度的最小单元,cpu虚拟出来,利用线程充分利用cpu资源。

    • 进程:操作系统调度的最小单元。

  • 为什么需要创建线程池

    • 线程的创建和销毁都涉及到系统调用比较消耗资源,线程池的使用是避免频繁创建和销毁线程带来的开销。

  • 线程池如何创建

    • 根据需要创建线程池 Executors.newCachedThreadPool()

    • 创建固定线程池 Executors.newFixedThreadPool(int nThreads)

    • 创建单个线程池 Executors.newSingleThreadExecutor()

    • 创建 定时执行线程池 newScheduleThreadPool

    • 创建单个定时任务线程池 newSingleThreadScheduledExecutor

    • 以上物种底层都是通过ThreadPoolExecutor()设置不同参数封装后的结果。

  • 各个参数什么意思

    • 1:coreSize,核心线程数

    • 2:maxSize,最大线程数

    • 3:keepliveTime,生存时间

    • 4:TimeUnit:时间单位

    • 5:ThreadFactory

    • 6:BlockingQueue

    • 7:reject:拒绝策略。

  • 线程什么时候是否初始化

    • 1:可在线程池创建时做初始化,默认在excute或submit时创建

    • 2:初始化线程实际上是创建worker,每个worker也实现了impl runable接口。

    • 3:先吃对象有一个int类型的数据ctl,用于保存线程状态和线程个数。

  • 线程如何被执行

    • 1:通过执行submit和excute完成。

    • 2:线判断当前线程数是否小于coreSize,小于则addwoker

    • 3:如果线程数据大于等于coreSize,且线程状态为运行,则放入阻塞队列成功。放入阻塞队列成功会再次判断线程运行状态,状态非可运行则从队列中remove。

    • 4:如果阻塞队列offer失败,即队列被写满,则放置最大线程数据中。

    • 5:最大线程池中线程已满则执行拒绝策略。

  • 拒绝策略

    • 1:AbortPolicy丢弃任务并抛异常

    • 2:DiscardPolicy 直接丢弃任务,不抛异常

    • 3:DiscardOldestPolicy 丢弃老的任务,不跑异常 e.getQueue().poll();

    • 4:CallerRunsPolicy 由调用线程处理该任务。

    • 5:自定定义拒绝策略,实现拒绝接口RejectedExecutionHandler,重写rejectedExecution方法

  • 线程如何被释放,如何不被释放。

    • 1:有个配置allowableCorePollTimeOut,默认为false,当为true时且任务corePoll未全部执行时,corepoll多余线程会被移除。

    • getTask(),获取阻塞队列的值,当没有值时会被take住。

    • runWorker(),该方法会不断重getTask取值,直到为空时释放线程

    • getTask()返回空则表明线程数可以被释放,释放最大线程数据条件是从队列poll超时且工作线程数大于核心线程数

  • 为什么用阻塞队列

    • 1:用于任务等待和唤醒,当对列中没有消息时,会take()住,线程不会释放。

  • 阻塞队列种类(后面找个时间专门分析各个阻塞队列实现方式)

    • 1:直接提交SynchronousQueue

    • 2:无界队列LinkedBlockingQueue

    • 3:有界队列ArrayBlockingQueue

源码解析

  • 线程池对象初始化

    // 1:构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
{
// 参数校验,核心线程数必须大于0,最大线程数据必须大于等于核心线程数
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

  • 初始化核心线程

    //初始化核心线程:实际是创建worker
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}

  • 提交任务

    //2:任务添加:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// ctl AtomicInteger 且前三位为状态,后29为个线程个数
int c = ctl.get();

//1:当前线程数小于核心线程数则创建线程:即创建worker
if (workerCountOf(c) < corePoolSize) {
// true 表明为核心线程数,false表明为最大线程数
if (addWorker(command, true))
return;
c = ctl.get();
}

//2:当前线程数大于等于核心线程数,则将任务添加至队列中。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}

//队列放满,即offer失败,则创建最大线程
else if (!addWorker(command, false))
reject(command);//创建最大线程创建失败达到最大值则执行决绝策略
}

//3:创建线程 woker
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
//4:执行woker start方法:
public void run() {
runWorker(this);
}

  • 执行任务

    //5:执行任务
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//getTask() 从队列中获取数据,获取为null,则将此线程终止。
while (task != null || (task = getTask()) != null) {
//省略部分代码
//真正执行任务
task.run();
//省略部分代码
}
completedAbruptly = false;
} finally {
// 终止线程
processWorkerExit(w, completedAbruptly);
}
}

//6:获取任务

private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// allowCoreThreadTimeOut是否允许销毁核心线程,默认false
// 只有当使用到最大线程时,且最大线程大于核心线程时才为true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 此处很关键
// 1:活跃线程数大于最大线程数
// 2:从阻塞队列通过poll获取返回空,则timeouttrue,且timedtrue
// 返回null,则会释放房前线程。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// 采用阻塞队列的关键在于
// 1:如果活跃数大于核心线程,则通过poll获取,一但队列为空获取超市,则返回timeOuttrue,共上面返回mjull
// 2:当活跃数不大于核心线程时,则通过take(),阻塞,直到队列中有值为止。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}



文章转载自魔都码农,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论