暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

AQS源码分析之Elasticsearch BaseFuture

开发架构二三事 2020-03-24
594

多线程模式

Java多线程编程中,常用的多线程设计模式包括:Future模式、Master-Worker模式、Guarded Suspeionsion模式、不变模式和生产者-消费者模式等

Future模式

Future模式对于多线程而言,如果线程A要等待线程B的结果,那么线程A没必要一直等待B,直到B有结果,可以先拿到一个未来的Future,等B有结果时再获取真实的结果。Future模式的核心在于:去除了主函数的等待时间,并使得原本需要等待的时间段可以用于处理其他业务逻辑。

Future模式有点类似于商品订单。在网上购物时,提交订单后,在收货的这段时间里无需一直在家里等候,可以先干别的事情。类推到程序设计中时,当提交请求时,期望得到答复时,如果这个答复可能很慢。传统的是一直持续等待直到这个答复收到之后再去做别的事情,但如果利用Future模式,其调用方式改为异步,而原先等待返回的时间段,在主调用函数中,则可以用于处理其他事务。

Future模式的应用

Future模式的应用场景非常多,比如在rxjava,guava,dubbo的client与server交互的response与request也是用future进行包装的,还有很多其他的场景中也都离不开Future的身影。下面我们顺便来看下Future在Elasticsearch中的应用。future的主要有以下几个方法:

  • cancel(boolean) :尝试取消任务的执行;

  • get():尝试获取任务执行的结果,是一个阻塞方法;

  • get(long,TimeUnit):获取任务执行结果的方法,阻塞到指定时间;

  • isDone():任务是否执行完成;

  • isCancelled():任务是否已经取消。

org.elasticsearch.common.util.concurrent.BaseFuture

我们先来分析一下它的内部类org.elasticsearch.common.util.concurrent.BaseFuture.Sync:
  1. /**

  2. 1. 遵循{@link AbstractQueuedSynchronizer}的约定,我们创建了一个私有子类来保存同步器。该同步器用于实现阻塞和等待调用以及以线程安全的方式处理状态更改。将来的当前状态保持为“同步”状态,并且只要状态更改为{@link #COMPLETED}或{@link #CANCELLED},就会释放锁定。

  3. 2. 为了避免进行释放和获取的线程之间的竞争,我们分两步过渡到最终状态。一个线程将成功地将CAS从RUNNING转换为COMPLETING,然后该线程将设置计算结果,然后才转换为COMPLETED或CANCELLED。

  4. 3. 我们不使用在aqs中acquire方法之间传递的整数参数,因此我们在各处传递-1来填充这个参数。

  5. */

  6. static final class Sync<V> extends AbstractQueuedSynchronizer {

  7. /* Valid states. */

  8. static final int RUNNING = 0;// 初始状态,AQS的state默认为0

  9. static final int COMPLETING = 1;

  10. static final int COMPLETED = 2;

  11. static final int CANCELLED = 4;


  12. private V value;

  13. private Throwable exception;


  14. /*

  15. * Acquisition succeeds if the future is done, otherwise it fails.

  16. */

  17. @Override

  18. protected int tryAcquireShared(int ignored) {//当future完成时获取成功,否则获取失败

  19. if (isDone()) {// future完成

  20. return 1;

  21. }

  22. return -1;

  23. }


  24. /*

  25. * We always allow a release to go through, this means the state has been

  26. * successfully changed and the result is available.

  27. */

  28. @Override

  29. protected boolean tryReleaseShared(int finalState) {// 释放

  30. // 设置最终状态

  31. setState(finalState);

  32. return true;

  33. }


  34. /**

  35. * Blocks until the task is complete or the timeout expires. Throws a

  36. * {@link TimeoutException} if the timer expires, otherwise behaves like

  37. * {@link #get()}.

  38. */

  39. V get(long nanos) throws TimeoutException, CancellationException,

  40. ExecutionException, InterruptedException {//阻塞直到任务完成或者时间超时


  41. // Attempt to acquire the shared lock with a timeout.

  42. /**

  43. 这里调用的是java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquireSharedNanos方法

  44. public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)

  45. throws InterruptedException {

  46. if (Thread.interrupted())

  47. throw new InterruptedException();

  48. return tryAcquireShared(arg) >= 0 ||

  49. doAcquireSharedNanos(arg, nanosTimeout);

  50. }

  51. 1. 这里调用的是上面的tryAcquireShared方法,在任务完成时返回1,否则返回-1,当返回1时会直接返回true,不会执行doAcquireSharedNanos方法;

  52. 2. 如果任务没有完成,则tryAcquireShared(arg) >= 0为false,那么会进入doAcquireSharedNanos方法,在该方法中会在未超时的时间内调用tryAcquireShared方法再次尝试,也是在任务执行完成时返回true,否则进行阻塞直到超时释放并返回false。当两个结果都为false时会进入下面这个判断。

  53. */

  54. if (!tryAcquireSharedNanos(-1, nanos)) {

  55. throw new TimeoutException("Timeout waiting for task.");

  56. }

  57. // 如果能进行到这里,证明任务已经执行完成,可以获取到结果

  58. return getValue();

  59. }


  60. /**

  61. * Blocks until {@link #complete(Object, Throwable, int)} has been

  62. * successfully called. Throws a {@link CancellationException} if the task

  63. * was cancelled, or a {@link ExecutionException} if the task completed with

  64. * an error.

  65. */

  66. V get() throws CancellationException, ExecutionException,

  67. InterruptedException {

  68. /**

  69. 这里我们看下方法:

  70. public final void acquireSharedInterruptibly(int arg)

  71. throws InterruptedException {

  72. if (Thread.interrupted())

  73. throw new InterruptedException();

  74. if (tryAcquireShared(arg) < 0)

  75. doAcquireSharedInterruptibly(arg);

  76. }

  77. 1. tryAcquireShared方法在任务未完成时会返回-1,然后进入doAcquireSharedInterruptibly方法;

  78. 2. 再来看doAcquireSharedInterruptibly方法,里面会调用tryAcquireShared方法进行判断,如果任务完成则返回值为1,会直接返回,否则代表任务未完成,线程会被park,进行等待状态,直到complete方法被调用。

  79. */

  80. // Acquire the shared lock allowing interruption.

  81. acquireSharedInterruptibly(-1);

  82. return getValue();

  83. }


  84. /**

  85. * Implementation of the actual value retrieval. Will return the value

  86. * on success, an exception on failure, a cancellation on cancellation, or

  87. * an illegal state if the synchronizer is in an invalid state.

  88. */

  89. private V getValue() throws CancellationException, ExecutionException {

  90. int state = getState();

  91. switch (state) {

  92. case COMPLETED:

  93. if (exception != null) {

  94. throw new ExecutionException(exception);

  95. } else {

  96. // 返回执行结果

  97. return value;

  98. }


  99. case CANCELLED:

  100. throw new CancellationException("Task was cancelled.");


  101. default:

  102. throw new IllegalStateException(

  103. "Error, synchronizer in invalid state: " + state);

  104. }

  105. }


  106. /**

  107. * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}.

  108. */

  109. boolean isDone() {

  110. // 是否执行完成

  111. return (getState() & (COMPLETED | CANCELLED)) != 0;

  112. }


  113. /**

  114. * Checks if the state is {@link #CANCELLED}.

  115. */

  116. boolean isCancelled() {

  117. // 是否处于取消状态

  118. return getState() == CANCELLED;

  119. }


  120. /**

  121. * Transition to the COMPLETED state and set the value.

  122. */

  123. boolean set(@Nullable V v) {//过渡到COMPLETED状态并设置值

  124. // 设置状态

  125. return complete(v, null, COMPLETED);

  126. }


  127. /**

  128. * Transition to the COMPLETED state and set the exception.

  129. */

  130. boolean setException(Throwable t) {// 过渡到COMPLETED状态并设置异常

  131. return complete(null, t, COMPLETED);

  132. }


  133. /**

  134. * Transition to the CANCELLED state.

  135. */

  136. boolean cancel() {//过渡到CANCELLED状态

  137. return complete(null, null, CANCELLED);

  138. }


  139. /**

  140. * Implementation of completing a task. Either {@code v} or {@code t} will

  141. * be set but not both. The {@code finalState} is the state to change to

  142. * from {@link #RUNNING}. If the state is not in the RUNNING state we

  143. * return {@code false} after waiting for the state to be set to a valid

  144. * final state ({@link #COMPLETED} or {@link #CANCELLED}).

  145. *

  146. * @param v the value to set as the result of the computation.

  147. * @param t the exception to set as the result of the computation.

  148. * @param finalState the state to transition to.

  149. */

  150. private boolean complete(@Nullable V v, @Nullable Throwable t,

  151. int finalState) {

  152. // cas AQS的state从初始的RUNNING到COMPLETING

  153. boolean doCompletion = compareAndSetState(RUNNING, COMPLETING);

  154. if (doCompletion) {//如果cas成功了

  155. // If this thread successfully transitioned to COMPLETING, set the value

  156. // and exception and then release to the final state.

  157. this.value = v;//设置结果

  158. this.exception = t;// 设置异常

  159. /**

  160. public final boolean releaseShared(int arg) {

  161. if (tryReleaseShared(arg)) {

  162. doReleaseShared();// 在里面执行线程唤醒的任务,unparkSuccessor

  163. return true;

  164. }

  165. return false;

  166. }

  167. */

  168. releaseShared(finalState);// 设置最终状态

  169. } else if (getState() == COMPLETING) {// 如果是完成中

  170. // If some other thread is currently completing the future, block until

  171. // they are done so we can guarantee completion.

  172. acquireShared(-1);// 如果有线程处于完成中,阻塞直到他们全部完成

  173. }

  174. // 返回结果

  175. return doCompletion;

  176. }

  177. }

  • 遵循{@link AbstractQueuedSynchronizer}的约定,我们创建了一个私有子类来保存同步器。该同步器用于实现阻塞和等待调用以及以线程安全的方式处理状态更改。将来的当前状态保持为“同步”状态,并且只要状态更改为{@link #COMPLETED}或{@link #CANCELLED},就会释放锁定。

  • 为了避免进行释放和获取的线程之间的竞争,我们分两步过渡到最终状态。一个线程将成功地将CAS从RUNNING转换为COMPLETING,然后该线程将设置计算结果,然后才转换为COMPLETED或CANCELLED。

  • 我们不使用在aqs中acquire方法之间传递的整数参数,因此我们在各处传递-1来填充这个参数。

  • get(long nanos)方法,get()方法和complete方法都是可能会阻塞的,这些可以理解成都是属于client方法,即外层的方法,具体细节参考代码注释内容。

  • 唤醒操作依赖于 set(@Nullable V v)、setException(Throwable t)、cancel()等方法的调用,因为在这些方法中都调用了complete()方法并进行了state的更新。这些可以理解成处理任务的server端的方法,即内层通知外层任务执行完成的方法。

  • 有一点需要注意,这里可能会存在外层多个线程同时get的情况,如果此时任务已经完成,则结果立即返回,否则多个线程都会进入阻塞,然后在complete方法的releaseShared方法中进行unpark。

BaseFuture中的属性与方法一览:
  1. public abstract class BaseFuture<V> implements Future<V> {


  2. private static final String BLOCKING_OP_REASON = "Blocking operation";


  3. /**

  4. * Synchronization control for AbstractFutures.

  5. */

  6. private final Sync<V> sync = new Sync<>();


  7. @Override

  8. public V get(long timeout, TimeUnit unit) throws InterruptedException,

  9. TimeoutException, ExecutionException {

  10. assert timeout <= 0 || blockingAllowed();

  11. // 这里调用的是sync的get(timeout)方法

  12. return sync.get(unit.toNanos(timeout));

  13. }


  14. /**

  15. * {@inheritDoc}

  16. * <p>

  17. * The default {@link BaseFuture} implementation throws {@code

  18. * InterruptedException} if the current thread is interrupted before or during

  19. * the call, even if the value is already available.

  20. *

  21. * @throws InterruptedException if the current thread was interrupted before

  22. * or during the call (optional but recommended).

  23. * @throws CancellationException {@inheritDoc}

  24. */

  25. @Override

  26. public V get(long timeout, TimeUnit unit) throws InterruptedException,

  27. TimeoutException, ExecutionException {

  28. assert timeout <= 0 || blockingAllowed();

  29. // 调用的是sync的get(timeout)方法

  30. return sync.get(unit.toNanos(timeout));

  31. }


  32. /*

  33. * Improve the documentation of when InterruptedException is thrown. Our

  34. * behavior matches the JDK's, but the JDK's documentation is misleading.

  35. */


  36. /**

  37. * {@inheritDoc}

  38. * <p>

  39. * The default {@link BaseFuture} implementation throws {@code

  40. * InterruptedException} if the current thread is interrupted before or during

  41. * the call, even if the value is already available.

  42. *

  43. * @throws InterruptedException if the current thread was interrupted before

  44. * or during the call (optional but recommended).

  45. * @throws CancellationException {@inheritDoc}

  46. */

  47. @Override

  48. public V get() throws InterruptedException, ExecutionException {

  49. assert blockingAllowed();

  50. return sync.get();

  51. }


  52. // protected so that it can be overridden in specific instances

  53. protected boolean blockingAllowed() {// 用于es集群状态、处理线程等的校验,可以被使用的地方重写

  54. return Transports.assertNotTransportThread(BLOCKING_OP_REASON) &&

  55. ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON) &&

  56. ClusterApplierService.assertNotClusterStateUpdateThread(BLOCKING_OP_REASON) &&

  57. MasterService.assertNotMasterUpdateThread(BLOCKING_OP_REASON);

  58. }


  59. @Override

  60. public boolean isDone() {

  61. return sync.isDone();

  62. }


  63. @Override

  64. public boolean isCancelled() {

  65. return sync.isCancelled();

  66. }


  67. @Override

  68. public boolean cancel(boolean mayInterruptIfRunning) {

  69. if (!sync.cancel()) {

  70. return false;

  71. }

  72. done();

  73. if (mayInterruptIfRunning) {

  74. interruptTask();

  75. }

  76. return true;

  77. }


  78. // 模板方法

  79. protected void interruptTask() {

  80. }


  81. /**

  82. * Subclasses should invoke this method to set the result of the computation

  83. * to {@code value}. This will set the state of the future to

  84. * {@link BaseFuture.Sync#COMPLETED} and call {@link #done()} if the

  85. * state was successfully changed.

  86. *

  87. * @param value the value that was the result of the task.

  88. * @return true if the state was successfully changed.

  89. */

  90. protected boolean set(@Nullable V value) {

  91. boolean result = sync.set(value);

  92. if (result) {

  93. done();

  94. }

  95. return result;

  96. }

可以看到BaseFuture中的方法大都依赖于Sync来实现的。

总结

全篇设计的最妙的地方是用Sync包装了Future的方法,然后对tryAcquireShared方法和tryReleaseShared方法的重写也是整个设计的核心。以上纯属个人观点,不当之处还请指正。


文章转载自开发架构二三事,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论