暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Executor框架

Java收录阁 2020-05-13
850

Executor框架包含的主要的类与接口如下:

  1. Executor:是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。

  2. ThreadPoolExecutor:线程池的核心实现类,用来执行被提交的任务。

  3. ScheduledThreadPoolExecutor:可以在给定的延迟后运行命令,或者定期执 行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。

  4. Future和FutureTask:异步计算的结果

  5. Runnable和Callable:都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行。


主线程首先要创建实现Runnable或者Callable接口的任务对象。工具类Executors可以把一 个Runnable对象封装为一个Callable对象(Executors.callable(Runnable task)或 Executors.callable(Runnable task,Object resule))。

然后可以把Runnable对象直接交给ExecutorService执行ExecutorService.execute(Runnable command);或者也可以把Runnable对象或Callable对象提交给ExecutorService执行(ExecutorService.submit(Runnable task)或ExecutorService.submit(Callable task)。

如果执行ExecutorService.submit(…),ExecutorService将返回一个实现Future接口的对象FutureTask。由于FutureTask实现了Runnable,程序员也可 以创建FutureTask,然后直接交给ExecutorService执行。

最后,主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行
FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。

Executor框架的成员

Executor框架主要包括以下成员:ThreadPoolExecutor、ScheduledThreadPoolExecutor、 Future接口、Runnable接口、Callable接口和Executors。

ThreadPoolExecutor
ThreadPoolExecutor通常使用工厂类Executors来创建。Executors可以创建3种类型的ThreadPoolExecutor:SingleThreadExecutor、FixedThreadPool和CachedThreadPool。

FixedThreadPool
FixedThreadPool被称为可重用固定线程数的线程池,下面是FixedThreadPool的源码:

    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }

    FixedThreadPool的corePoolSize和maximumPoolSize都被设置为创建FixedThreadPool时指 定的参数nThreads。
    当线程池中的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的 最长时间,超过这个时间后多余的线程将被终止。这里把keepAliveTime设置为0L,意味着多余的空闲线程会被立即终止。FixedThreadPool的execute()方法的运行示意图如下:


    说明如下:

    1. 如果当前运行的线程数小于corePoolSize,则创建新线程来执行任务

    2. 在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入LinkedBlockingQueue

    3. 当线程执行完1中的任务后,会在循环中反复从linkedBlockingQueue中获取任务来执行

    FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列容量为Integer.MAX_VALUE),使用无界队列作为工作队列会对线程池带来如下影响:

    1. 当线程池中的线程数达到corePoolSize之后,新任务将在无界队列中等待,因此线程池中的线程永远不会超过corePoolSize

    2. 由于1, 使用无界队列时maximumPoolSize将是一个无效参数

    3. 由于1和2, 使用无界队列时keepAliveTime将是一个无效参数

    4. 由于使用无界队列,运行中的FixedThreadPool(未执行方法shutdown()或 shutdownNow())不会拒绝任务,不会调用RejectedExecutionHandler.rejectedExecution方法


    SingleThreadExecutor
    SingleThreadExecutor是使用单个worker线程的Executor:

      public static ExecutorService newSingleThreadExecutor() {
      return new FinalizableDelegatedExecutorService
      (new ThreadPoolExecutor(1, 1,
      0L, TimeUnit.MILLISECONDS,
      new LinkedBlockingQueue<Runnable>()));
      }

      SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1,其他参数与FixedThreadPool相同。SingleThreadExecutor使用无界队列LinkedBlockingQueue作为线程池的工 作队列(队列的容量为Integer.MAX_VALUE);SingleThreadExecutor使用无界队列作为工作队列 对线程池带来的影响与FixedThreadPool相同

      说明如下:

      1. 如果当前运行的线程数少于corePoolSize(即线程池中无运行的线程),则创建一个新线程来执行任务

      2. 在线程池完成预热后(当前线程池中有一个运行的线程),将任务加入到LinkedBlockingQueue

      3. 线程执行完1中的任务后,会在一个无限循环中反复从LinkedBlockingQueue中获取任务来执行


      CachedThreadPool
      CachedThreadPool是一个会根据需要创建新线程的线程池:

        public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
        60L, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>());
        }

        CacheThreadPool的corePoolSize的值为0,即corePool为空;maximumPoolSize是Integer的最大值,也就是maximumPool是无界的;keepAliveTime设置成60秒意味着CacheThreadPool中空闲的线程等待任务最长的时间是60秒,超过60秒该线程就会被终止

        CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但maximumPool是无界的。这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,
        CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。

        Callable/Future的使用以及原理

        线程池执行任务有两种方法,一种是execute,另一种是submit;这两个方法的区别如下:
        execute

        1. execute只可以接收一个Runnable的参数

        2. execute如果出现异常会抛出

        3. execute没有返回值

        submit

        1. submit 可以接收Runnable和Callable两种类型的参数

        2. 对于submit方法传入一个callable参数,可以得到一个Future的返回值

        3. submit 方法不会抛出异常,除非调用Future.get()方法

        Callable/Future案例

          public class CallableDemo implements Callable<String> {
          @Override
          public String call() throws Exception {
          return "Hello";
          }
          public static void main(String[] args) throws InterruptedException, ExecutionException {
          CallableDemo callableDemo = new CallableDemo();
          FutureTask futureTask = new FutureTask(callableDemo);
          new Thread(futureTask).start();
          System.out.println(futureTask.get());
          }
          }

          为什么要使用回调呢?因为结果值是由另外一个线程计算的,当前线程不知道结果什么时候能够计算完成,所以传递一个函数给计算线程,当计算结束时候调用这个回调接口,回传结果值。这种方式在很多地方都有用到,比如Dubbo的异步调用、消息中间件的异步通信等等。

          接下来看看Callable/Future是如何实现的,在刚刚的例子中我们用到了两个api,分别是Callable和FutureTask;Callable是一个函数式接口(一个有且仅有一个抽象方法,但是可以有多个非抽象方法的接口;可以被隐式转换为 lambda 表达式),里面只有一个call方法:

            @FunctionalInterface
            public interface Callable<V> {
            /**
            * Computes a result, or throws an exception if unable to do so.
            *
            * @return computed result
            * @throws Exception if unable to compute a result
            */
            V call() throws Exception;
            }

            FutureTask实现了RunnableFuture接口;RunnableFuture接口继承了Runnable和Future这两个接口:

              public interface RunnableFuture<V> extends Runnable, Future<V> {
              /**
              * Sets this Future to the result of its computation
              * unless it has been cancelled.
              */
              void run();
              }

              Future接口表示一个任务的生命周期,并提供了相应方法来判断任务是否已经完成、取消以及获取任务结果等:

                public interface Future<V> {


                boolean cancel(boolean mayInterruptIfRunning);


                boolean isCancelled();


                boolean isDone();

                V get() throws InterruptedException, ExecutionException;

                V get(long timeout, TimeUnit unit)
                throws InterruptedException, ExecutionException, TimeoutException;
                }

                到这里我们可以知道,FutureTask就是Runnable和Future的结合,如果把Runnable比作生产者,Future就是消费者,那FutureTask就是被这两者共享的;生产者通过run方法计算结果,消费者通过get方法获取结果。

                作为生产者消费者模式,有一个很重要的机制就是如果生产者的数据还没有产生,消费者会被阻塞;当生产者数据准备好了以后会唤醒消费者继续执行。那么在FutureTask里面是基于什么方法实现的呢?

                通过源码我们可以看到,在FutureTask中通过变量state来维护状态,共用一下7中状态:

                  public class FutureTask<V> implements RunnableFuture<V> {


                  private volatile int state;
                  private static final int NEW = 0; // 新建状态,表示这个FutureTask还没有开始运行
                  private static final int COMPLETING = 1; // 表示FutureTask任务已经完成,但是还有一些后续操作还没有完成,比如唤醒等待线程
                  private static final int NORMAL = 2; // 任务结束,正常完成,没有异常
                  private static final int EXCEPTIONAL = 3; // 因为发生了异常,任务结束
                  private static final int CANCELLED = 4; // 因为取消了任务,任务结束
                  private static final int INTERRUPTING = 5; // 任务结束,也是由于取消了任务,但是发起了中断运行任务线程的中断请求
                  private static final int INTERRUPTED = 6; // 任务结束,也是由于取消了任务,完成了中断运行任务线程的中断请求
                  .....// 省略部分代码
                  }

                  run方法
                  run方法其实很简单,就是调用Callable的call方法返回结果值result,根据是否发生异常调用set(result)或者setException(ex);不过因为FutureTask任务都是在多线程环境下运行,所以需要注意并发冲突问题。在run方法中,并没有使用synchronized代码块或者lock来解决并发问题,而是通过CAS乐观锁来实现并发,确保只有一个线程能运行FutureTask任务。

                    public void run() {
                    // 如果状态不是New,或者设置runner的值失败,说明有其它线程已经调用了run方法并成功设置了runner的值,直接return
                    // 确保了只有一个线程可以运行try代码块中的代码
                    if (state != NEW ||
                    !UNSAFE.compareAndSwapObject(this, runnerOffset,
                    null, Thread.currentThread()))
                    return;
                    try {
                    Callable<V> c = callable;
                    if (c != null && state == NEW) { // 只有c不为null且状态为New的时候,才可以执行
                    V result;
                    boolean ran;
                    try {
                    result = c.call(); // 调用Callable的call方法,并得到返回值
                    ran = true; // 设置运行成功标识
                    } catch (Throwable ex) {
                    result = null;
                    ran = false; // 如果发生异常,设置运行成功标识位false
                    setException(ex); // 设置异常结果
                    }
                    if (ran)
                    set(result); // 如果成功,把结果赋值给outCome变量
                    }
                    } finally {
                    runner = null;
                    int s = state;
                    if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
                    }
                    }


                    protected void set(V v) {
                    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                    outcome = v;
                    UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
                    finishCompletion();
                    }
                    }

                    get方法
                    get方法就是通过阻塞获取线程结果,主要做了两件事:

                    1. 判断状态如果小于等于COMPLETING,表示FutureTask任务还没有执行完,则调用awaitDone方法等待

                    2. report返回结果值,或者抛出异常

                      public V get() throws InterruptedException, ExecutionException {
                      int s = state;
                      if (s <= COMPLETING)
                      s = awaitDone(false, 0L);
                      return report(s);
                      }

                      awaitDone方法
                      如果当前的任务还没有被执行完,则将当前线程加入到等待队列中,等到run方法执行完之后会唤醒继续执行:

                        private int awaitDone(boolean timed, long nanos) throws InterruptedException {
                        final long deadline = timed ? System.nanoTime() + nanos : 0L;
                        WaitNode q = null;
                        boolean queued = false; // 标识节点是否已经添加到等待队列
                        for (;;) {
                        if (Thread.interrupted()) {
                        // 如果当前线程中断标志位为true,则从列表中移除节点,并且抛出exception
                        removeWaiter(q);
                        throw new InterruptedException();
                        }


                        int s = state;
                        if (s > COMPLETING) { // 状态大于COMPLETING,标识FutureTask已经结束
                        if (q != null)
                        q.thread = null; // 将节点线程设置成null, 因为线程没有阻塞等待
                        return s;
                        }
                        else if (s == COMPLETING) // 任务已经完成,但是还有一些后续操作没有完成,当前线程让出执行权
                        Thread.yield();
                        else if (q == null) // 标识state为New,需要将当前线程阻塞等待, 创建waitNode节点
                        q = new WaitNode();
                        else if (!queued)
                        // 通过CAS将q添加到等到队列,如果失败则queued为false,下次循环会继续添加,知道成功
                        queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                        q.next = waiters, q);
                        else if (timed) { // timed为true表示要设置超时
                        nanos = deadline - System.nanoTime();
                        if (nanos <= 0L) {
                        removeWaiter(q);
                        return state;
                        }
                        LockSupport.parkNanos(this, nanos); // 让当前线程等待nanos时间
                        }
                        else
                        LockSupport.park(this);
                        }
                        }

                        report方法
                        report方法就是根据传入的状态值判断是返回结果还是抛出异常:

                          private V report(int s) throws ExecutionException {
                          Object x = outcome;
                          if (s == NORMAL)
                          return (V)x;
                          if (s >= CANCELLED) // 大于等于CANCELLED,表示手动去掉了FutureTask
                          throw new CancellationException();
                          throw new ExecutionException((Throwable)x);
                          }


                          文章转载自Java收录阁,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                          评论