暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

【细谈Java并发】谈谈CyclicBarrier

蹲厕所的熊 2018-05-07
230

蹲厕所的熊 转载请注明原创出处,谢谢!

1、简介

CyclicBarrier是一个同步工具类,它允许一组线程在到达某个栅栏点(common barrier point)互相等待,发生阻塞,直到最后一个线程到达栅栏点,栅栏才会打开,处于阻塞状态的线程恢复继续执行.它非常适用于一组线程之间必需经常互相等待的情况。CyclicBarrier字面理解是循环的栅栏,之所以称之为循环的是因为在等待线程释放后,该栅栏还可以复用。

建议阅读CyclicBarrier源码前,先深入研究一下ReentrantLock的原理,搞清楚condition里await和signal的原理,这部分可以看我之前的文章:【细谈Java并发】谈谈AQS 和 【细谈Java并发】谈谈ReentrantLock

好了,我们来看看如何使用它吧。

2、使用场景

我们可以简单使用CyclicBarrier来模拟一下对战平台中玩家需要完全准备好了,才能进入游戏的场景。

  1. public class CyclicBarrierTest {


  2.    private final static ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(5);

  3.    private final static CyclicBarrier BARRIER = new CyclicBarrier(5);


  4.    public static void main(String[] args) {

  5.        for (int i = 0; i < 5; i++) {

  6.            final String name = "玩家" + i;

  7.            EXECUTOR_SERVICE.execute(new Runnable() {

  8.                @Override

  9.                public void run() {

  10.                    try {

  11.                        Thread.sleep(2000);

  12.                        System.out.println(name + "已准备,等待其他玩家准备...");

  13.                        BARRIER.await();

  14.                        Thread.sleep(1000);

  15.                        System.out.println(name + "已加入游戏");

  16.                    } catch (InterruptedException e) {

  17.                        System.out.println(name + "离开游戏");

  18.                    } catch (BrokenBarrierException e) {

  19.                        System.out.println(name + "离开游戏");

  20.                    }

  21.                }

  22.            });

  23.        }

  24.        EXECUTOR_SERVICE.shutdown();

  25.    }

  26. }

输出结果

  1. 玩家1已准备,等待其他玩家准备...

  2. 玩家0已准备,等待其他玩家准备...

  3. 玩家2已准备,等待其他玩家准备...

  4. 玩家3已准备,等待其他玩家准备...

  5. 玩家4已准备,等待其他玩家准备...

  6. 玩家2已加入游戏

  7. 玩家3已加入游戏

  8. 玩家4已加入游戏

  9. 玩家0已加入游戏

  10. 玩家1已加入游戏

3、原理分析

3.1、属性

首先看看它里面的所有属性。

  1. public class CyclicBarrier {

  2.    private static class Generation {

  3.        boolean broken = false;

  4.    }

  5.    // 锁

  6.    private final ReentrantLock lock = new ReentrantLock();

  7.    // 通过lock得到的一个状态变量,用来await和signal

  8.    private final Condition trip = lock.newCondition();

  9.    // 通过构造器传入的参数,表示总的等待线程的数量

  10.    private final int parties;

  11.    // 当屏障正常打开后运行的程序,通过最后一个调用await的线程来执行

  12.    private final Runnable barrierCommand;

  13.    // 当前的Generation。每当屏障失效或者开闸之后都会自动替换掉。从而实现重置的功能

  14.    private Generation generation = new Generation();

  15.    // 和parties一样,每次线程await后减1

  16.    private int count;

  17.    ...省略后面代码

  18. }

3.2、构造方法

  1. public CyclicBarrier(int parties, Runnable barrierAction) {

  2.    if (parties <= 0) throw new IllegalArgumentException();

  3.    this.parties = parties;

  4.    this.count = parties;

  5.    this.barrierCommand = barrierAction;

  6. }


  7. public CyclicBarrier(int parties) {

  8.    this(parties, null);

  9. }

  1. 默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier已经到达屏障位置,线程被阻塞。

  2. 另外一个构造方法CyclicBarrier(int parties, Runnable barrierAction),其中barrierAction任务会在所有线程到达屏障后执行。

3.3、await()

最主要的方法就是await()方法,调用await()的线程会等待直到有足够数量的线程调用await——也就是开闸状态。

  1. public int await() throws InterruptedException, BrokenBarrierException {

  2.    try {

  3.        return dowait(false, 0L);

  4.    } catch (TimeoutException toe) {

  5.        throw new Error(toe); // cannot happen

  6.    }

  7. }


  8. public int await(long timeout, TimeUnit unit)

  9.        throws InterruptedException,

  10.        BrokenBarrierException,

  11.        TimeoutException {

  12.    return dowait(true, unit.toNanos(timeout));

  13. }

await()和await(long, TimeUnit)都是调用dowait方法,区别就是参数不同,我们来看看dowait方法。

  1. private int dowait(boolean timed, long nanos)

  2.        throws InterruptedException, BrokenBarrierException,

  3.        TimeoutException {

  4.    final ReentrantLock lock = this.lock;

  5.    lock.lock();

  6.    try {

  7.        final Generation g = generation;


  8.        if (g.broken)    // 如果当前Generation是处于打破状态则传播这个BrokenBarrierExcption

  9.            throw new BrokenBarrierException();


  10.        if (Thread.interrupted()) {

  11.            // 如果当前线程被中断则使得当前generation处于打破状态,重置剩余count。

  12.            // 并且唤醒状态变量。这时候其他线程会传播BrokenBarrierException。

  13.            breakBarrier();

  14.            throw new InterruptedException();

  15.        }


  16.        int index = --count;    // 尝试降低当前count

  17.        /**

  18.         * 如果当前状态将为0,则Generation处于开闸状态。运行可能存在的command,

  19.         * 设置下一个Generation。相当于每次开闸之后都进行了一次reset。

  20.         */

  21.        if (index == 0) {  // tripped

  22.            boolean ranAction = false;

  23.            try {

  24.                final Runnable command = barrierCommand;

  25.                if (command != null)

  26.                    command.run();

  27.                ranAction = true;

  28.                nextGeneration();

  29.                return 0;

  30.            } finally {

  31.                if (!ranAction)    // 如果运行command失败也会导致当前屏障被打破。

  32.                    breakBarrier();

  33.            }

  34.        }


  35.        // loop until tripped, broken, interrupted, or timed out

  36.        for (;;) {

  37.            try {

  38.                if (!timed)    // 阻塞在当前的状态变量。

  39.                    trip.await();

  40.                else if (nanos > 0L)

  41.                    nanos = trip.awaitNanos(nanos);

  42.            } catch (InterruptedException ie) {

  43.                if (g == generation && ! g.broken) {    // 如果当前线程被中断了则使得屏障被打破。并抛出异常。

  44.                    breakBarrier();

  45.                    throw ie;

  46.                } else {

  47.                    Thread.currentThread().interrupt();

  48.                }

  49.            }


  50.            // 从阻塞恢复之后,需要重新判断当前的状态。

  51.            if (g.broken)

  52.                throw new BrokenBarrierException();


  53.            if (g != generation)

  54.                return index;


  55.            if (timed && nanos <= 0L) {

  56.                breakBarrier();

  57.                throw new TimeoutException();

  58.            }

  59.        }

  60.    } finally {

  61.        lock.unlock();

  62.    }

  63. }

此外再看下两个小过程:

这两个小过程当然是需要锁的,但是由于这两个方法只是通过其他方法调用,所以依然是在持有锁的范围内运行的。这两个方法都是对域进行操作。

nextGeneration实际上在屏障开闸之后重置状态。以待下一次调用。 breakBarrier实际上是在屏障打破之后设定打破状态,以唤醒其他线程并通知。

  1. private void nextGeneration() {

  2.    trip.signalAll();

  3.    count = parties;

  4.    generation = new Generation();

  5. }


  6. private void breakBarrier() {

  7.    generation.broken = true;

  8.    count = parties;

  9.    trip.signalAll();

  10. }

3.4、reset

reset方法比较简单。但是这里还是要注意一下要先打破当前屏蔽,然后再重建一个新的屏蔽。否则的话可能会导致信号丢失。

  1. public void reset() {

  2.    final ReentrantLock lock = this.lock;

  3.    lock.lock();

  4.    try {

  5.        breakBarrier();   // break the current generation

  6.        nextGeneration(); // start a new generation

  7.    } finally {

  8.        lock.unlock();

  9.    }

  10. }

4、CountDownLatch的区别

我用白话说的通俗点吧。

  1. CountDownLatch的使用是一次性的,而CyclicBarrier可以用reset进行重用。

  2. CountDownLatch是一个线程等待多个线程执行完了,再进行执行。而CyclicBarrier是多个线程等待所有线程都执行完了,再进行执行。



如果读完觉得有收获的话,欢迎点赞、关注、加公众号【蹲厕所的熊】,查阅更多精彩历史!!!

文章转载自蹲厕所的熊,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论