1、使用:
CyclicBarrier 中文名叫做屏障或者是栅栏,也可以用于线程间通信。
它可以等待 N 个线程都达到某个状态后继续运行的效果。
首先初始化线程参与者。
调用
await()
将会在所有参与者线程都调用之前等待。直到所有参与者都调用了
await()
后,所有线程从await()
返回继续后续逻辑。
2、原理:
是由ReentrantLock可重入锁和Condition共同实现的,里面有个计时器count,count为0时,才通过Condition唤醒线程,计数器不为0,通过Condition让线程睡眠
3、下面我将模仿jdk的CyclicBarrier实现方式,自己实现一遍
package com.jacky;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created by jacky on 2018/2/11.
*/
public class MyCyclicBarrier {
private int count;
private int parties;
private Runnable barrierAction;
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
public MyCyclicBarrier(int parties,Runnable barrierAction){
if (parties <=0){
throw new IllegalArgumentException();
}
this.parties = parties;
this.count = parties;
this.barrierAction = barrierAction;
}
public int await() throws InterruptedException,BrokenBarrierException {
lock.lock();
try {
int index = --count;
if (index ==0){
if (null == barrierAction){
barrierAction.run();
}
condition.signalAll();
return index;
}
for (;;){
condition.await();
return index;
}
}finally {
lock.unlock();
}
}
}
接下来,我们测试一下
package com.jacky;
import com.sun.org.apache.xpath.internal.SourceTree;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* Created by jacky on 2018/2/11.
*/
public class CyclicBarrierDemo {
public static void main(String[] args) throws InterruptedException {
MyCyclicBarrier barrier = new MyCyclicBarrier(3, new Runnable() {
@Override
public void run() {
Thread thread = Thread.currentThread();
System.out.println("barrierAction start"+thread.getName());
try {
Thread.sleep((int)Math.random()*300);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("barrierAction start"+thread.getName());
}
});
Runnable runnable1 = new Runnable() {
@Override
public void run() {
try {
Thread.sleep((int)(Math.random()*100));
} catch (InterruptedException e) {
e.printStackTrace();
}
Thread thread = Thread.currentThread();
System.out.println("thread start:"+thread.getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("thread end:"+thread.getName());
}
};
Runnable runnable2 = new Runnable() {
@Override
public void run() {
try {
Thread.sleep((int)(Math.random()*200));
} catch (InterruptedException e) {
e.printStackTrace();
}
Thread thread = Thread.currentThread();
System.out.println("thread start:"+thread.getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("thread end:"+thread.getName());
}
};
Thread thread1 = new Thread(runnable1);
Thread thread2 = new Thread(runnable2);
Thread thread3 = new Thread(runnable1);
thread1.start();
thread2.start();
thread3.start();
Thread.sleep(2000);
System.out.println("thread1:"+thread1.getName()+","+thread1.getState());
System.out.println("thread2:"+thread2.getName()+","+thread2.getState());
System.out.println("thread3:"+thread3.getName()+","+thread3.getState());
}
}
文章转载自felix技术分享,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




