暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

来自读者的提问,为什么要把这个抛出异常的线程移除掉,再创建一个新线程放到线程池?

小罗技术笔记 2020-05-25
308

点击上方“小罗技术笔记”,关注公众号

第一时间送达实用干货

之前发表一篇文章

70%人答不全!线程池中的一个线程异常了会被怎么处理?

一个读者提出了这么个问题,特此写此文来说说线程池是如何实现复用的。


     线程重用的核心是,我们知道Thread.start()只能调用一次,一旦这个调用结束,则该线程就到了stop状态,不能再次调用start。那么要达到复用的目的,则必须从Runnable接口的run()方法上入手,那么应该这样设计这个Runnable.run()方法(就叫外面的run()方法):

      它本质上是个无限循环,跑的过程中不断检查我们是否有新加入的子Runnable对象(就叫内部的runnable:run()吧,它就是用来实现我们自己的任务),有就调一下我们的run(),其实就一个大run()把其它小run()#1,run()#2,...给串联起来了,基本原理就这么简单 不停地处理我们提交的Runnable任务。

  1. 伪代码:

  2. public void run() {

  3.    while(true) {

  4.        if(tasks available) {

  5.           Runnable task = taskqueue.dequeue();

  6.           task.run();

  7.        } else {

  8.           // wait or whatever

  9.        }

  10.    }

  11. }

大概的实现思路我们粗略的了解了一下,那再分析jdk中是如何实现线程复用的

先看ThreadPoolExecutor.execute()方法

  1.    public void execute(Runnable command) {

  2.        if (command == null)

  3.            throw new NullPointerException();

  4.        int c = ctl.get();

  5.        if (workerCountOf(c) < corePoolSize) {

  6.            if (addWorker(command, true))

  7.                return;

  8.            c = ctl.get();

  9.        }

  10.        if (isRunning(c) && workQueue.offer(command)) {

  11.            int recheck = ctl.get();

  12.            if (! isRunning(recheck) && remove(command))

  13.                reject(command);

  14.            else if (workerCountOf(recheck) == 0)

  15.                addWorker(null, false);

  16.        }

  17.        else if (!addWorker(command, false))

  18.            reject(command);

  19.    }

分析:可以看出:ThreadPoolExecutor.execute()的功能就是:

1、将任务添加至阻塞队列workQueue,workQueue.offer(command) 2、根据core和maxPool,选择是否创建Worker,addWorker()

因此,线程复用的实现应该在worker中,打开addWorker()方法观察,源代码太长了这里我只写分析结果 addworker分为两部分:
1、创建worker,
2、启动worker
 规则校验:与core和maxPool数量的规则相同,创建worker:获取ThreadLocal的全局锁。安全的创建Worker执行t.start();

so又回到了Worker的run方法上了!

  1. public void run() {

  2.            runWorker(this);

  3. }

  4. final void runWorker(Worker w) {

  5.        Thread wt = Thread.currentThread();

  6.        Runnable task = w.firstTask;

  7.        w.firstTask = null;

  8.        w.unlock(); // allow interrupts

  9.        boolean completedAbruptly = true;

  10.        try {

  11.            while (task != null || (task = getTask()) != null) {

  12.            }

  13.          }

  14.          ·······

  15.        }

分析:

1、一个大大的while循环,当我们的task不为空的时候它就永远在循环,并且会源源不断的从getTask()获取新的任务。

2、通过task.run();执行具体的任务。

3、正常情况,只有当所有任务执行完毕才会停止运行。

我们进一步分析getTask()和task.run()方法。在getTask() 有这么一个代码

  1. Runnable r = timed ?

  2.                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

  3.                    workQueue.take();

  4.                if (r != null)

  5.                    return r;

  6.                timedOut = true;

可以发现是从workQueue中获取task的,所以最终的问题就是看这个变量workQueue是谁的成员变量。

  1. public class ThreadPoolExecutor extends AbstractExecutorService {

  2.    private final BlockingQueue<Runnable> workQueue;

  3.   ···

  4. }

    通过以上逐步分析可以发现其实线程复用最核心的一点是,新建一个Worker内部类就会建一个线程,并且会把这个内部类本身传进去当作任务去执行,这个内部类的run方法里实现了一个while循环,当任务队列没有任务时结束这个循环,则这个线程就结束。

那为啥抛出异常的线程会被移除掉,再创建一个新线程放到线程池呢?

    其实很简单,源码里面 while内部的try/catch 会捕获任务的异常记录然后抛出,循环里面出异常了就会结束这个循环了直接走这个worker的结束方法了,而 processWorkerExit中会默认删除线程,此时出异常的线程可能还有任务待处理新建一个线程来补上,若有任务就继续处理剩下的。



长按二维码关注

点个在看再走呗!

文章转载自小罗技术笔记,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论