暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Java 并发流程工具的实战探索

77

写在文章开头

在当今数字化时代,软件系统面临着日益增长的高并发需求。无论是大型电商平台在促销活动时处理海量的订单请求,还是在线游戏服务器同时承载众多玩家的交互操作,高效的并发处理能力都成为了系统性能和稳定性的关键因素。 Java作为一门广泛应用于企业级开发的编程语言,提供了丰富且强大的并发流程工具。这些工具犹如精巧的齿轮,相互配合,使开发者能够在多线程环境下有条不紊地控制程序的执行流程,确保各个线程之间协调运作,高效完成复杂的任务。 本文将踏上Java并发流程工具的实战探索之旅。我们不仅会深入剖析这些工具的核心原理,更会通过实际代码示例,详细展示它们在不同应用场景中的具体应用。从简单的线程同步控制,到复杂的多阶段任务协调,一步步揭开Java并发流程工具的神秘面纱,帮助读者掌握在实际项目中运用这些工具优化程序性能、提升系统可靠性的技巧。

Hi,我是 sharkChili ,是个不断在硬核技术上作死的技术人,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili 。


同时也非常欢迎你star我的开源项目mini-redis:https://github.com/shark-ctrl/mini-redis

因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注  “加群”  即可和笔者和笔者的朋友们进行深入交流。

CountDownLatch

详解CountDownLatch工作流程

笔者一般称CountDownLatch
为倒计时门闩,它主要用于需要某些条件下才能唤醒的需求场景,例如我们线程1必须等到线程2做完某些事,那么就可以设置一个CountDownLatch
并将数值设置为1,一旦线程2完成业务逻辑后,将数值修改为0,此时线程1就会被唤醒:

模拟等待工作完成

通过上述的描述可能有点抽象,我们直接通过几个例子演示一下,我们现在有这样一个需求,希望等待5个线程完成之后,打印输出一句工作完成:

对应的代码示例如下,可以看到我们创建了数值为5的CountDownLatch
 ,一旦线程池里的线程完成工作后就调用countDown
进行扣减,一旦数值变为0,主线程await就会放行,执行后续输出:

int workerSize = 5;
        CountDownLatch workCount = new CountDownLatch(workerSize);
        ExecutorService threadPool = Executors.newFixedThreadPool(workerSize);

        for (int i = 0; i < workerSize; i++) {
            final int workerNum = i;
            //5个工人输出完成工作后,扣减倒计时门闩数
            threadPool.submit(() -> {
                log.info("worker[{}]完成手头的工作", workerNum);
                workCount.countDown();
            });
        }

        try {
            //阻塞当前线程(主线程)往后走,只有倒计时门闩变为0之后才能继续后续逻辑
            log.info("等待worker工作完成");
            workCount.await();
        } catch (InterruptedException e) {
            log.info("倒计时门闩阻塞失败,失败原因[{}]", e.getMessage(), e);
        }

        threadPool.shutdown();
        while (!threadPool.isTerminated()) {

        }

        log.info("所有工人都完成手头的工作了");

对应的我们也给出输出结果,可以看到主线程在线程池线程完成后才输出:

模拟运动员赛跑

实际上CountDownLatch
可以让多个线程进行等待,我们不妨用线程模拟一下所有运动员就绪后,等待枪响后起跑的场景:

代码如下,每当运动员即线程池的线程准备就绪,则调用await等待枪响,一旦所有运动员就绪之后,主线程调用countDown
模拟枪响,然后运动员起跑:

public static void main(String[] args) {
        log.info("百米跑比赛开始");

        int playerNum = 3;
        CountDownLatch gun = new CountDownLatch(1);
        ExecutorService threadPool = Executors.newFixedThreadPool(playerNum);
        
        for (int i = 0; i < playerNum; i++) {
            final int playNo = i;
            
            threadPool.submit(() -> {
                log.info("[{}]号运动员已就绪", playNo);
                try {
                    gun.await();
                } catch (InterruptedException e) {
                    log.info("[{}]号运动员线程阻塞失败,失败原因[{}]", playNo, e.getMessage(), e);
                }
                log.info("[{}]号运动员已经到达重点", playNo);
            });
        }

        //按下枪 所有运动员起跑
        gun.countDown();

        threadPool.shutdown();
        while (!threadPool.isTerminated()) {

        }

        log.info("百米赛跑已结束");
    }

对应的我们也给出相应的输出结果:

从源码角度分析CountDownLatch工作流程

我们以等待所有工人完成工作的例子进行解析,实际上在CountDownLatch
是通过state和一个抽象队列即aqs
完成多线程之间的流程调度,主线程调用await
方法等待其他worker
线程,如果其它worker
线程没有完成工作,那么CountDownLatch
就会将其存入抽象队列中。

一旦其他线程将state
设置为0时,await
对应的线程就会从抽象队列中释放并唤醒:

对应我们给出countDown
的实现,可以看到该方法底层就是将aqs队列中的state
进行扣减:

public void countDown() {
        sync.releaseShared(1);
    }

//releaseShared内部核心逻辑就是将state扣减1
protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                //扣减state并通过cas修改赋值
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

countDown
本质上就是查看这个state
,如果state
被扣减为0,则调用aqs
底层doReleaseShared
方法将队列中等待线程唤醒:

public void countDown() {
        sync.releaseShared(1);
    }


public final boolean releaseShared(int arg) {
  //查看是否扣减为0
        if (tryReleaseShared(arg)) {
        //如果是0则将当前等待线程唤醒
            doReleaseShared();
            return true;
        }
        return false;
    }

上文讲解countDown涉及一些关于AQS的实用理解和设计,关于更多AQS的知识点,感兴趣的读者可以阅读一下笔者的这篇文章:

AQS 源码解析:原理与实践:https://mp.weixin.qq.com/s/vz4TctsA0JVjfYws9gfGqQ

Semaphore

详解Semaphore

信号量多用于限流的场景,例如我们希望单位时间内只能有一个线程工作,我们就可以使用信号量,只有拿到线程的信号量才能工作,工作完成后释放信号量,其余线程才能争抢这个信号量并进行进一步的操作。 对应我们给出下面这段代码,可以看到生命信号量数值为6,每当线程拿到3个信号量之后就会执行业务操作,完成后调用release
释放3个令牌,让其他线程继续争抢:

//设置可复用的信号量,令牌数为3
        Semaphore semaphore = new Semaphore(6true);
        //创建5个线程
        int workSize = 5;
        ExecutorService executorService = Executors.newFixedThreadPool(workSize);


        for (int i = 0; i < workSize; i++) {
            executorService.submit(() -> {
                try {
                    //拿3个令牌
                    semaphore.acquire(3);

                    log.info("进行业务逻辑处理.......");
                    ThreadUtil.sleep(1000);

                    //释放3个令牌
                    semaphore.release(3);

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        executorService.shutdown();
        while (!executorService.isTerminated()) {

        }

对应输出结果如下,可以看到每个线程拿到令牌后都会休眠1秒,从输出结果来看每秒只有两个线程才工作,符合我们的限流需求:

详解Semaphore工作原理

Semaphore
底层也是用到的aqs
队列,线程进行资源获取时也是通过查看state
是否足够,在明确足够的情况下进行state扣减,然后进行工作。如果线程发现state
数量不够,那么就会被Semaphore
存入aqs底层的抽象队列中,直到state
数量足够后被唤醒:

对此我们给出Semaphore底层的acquire的逻辑可以看到,它会读取state数值然后进行扣减,如果剩余数量大于0则说明令牌获取成功线程可以执行后续逻辑,反之说明当前令牌数不够,外部逻辑会将该线程挂到等待队列中,等待令牌足够后将其唤醒:

protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                //读取可用的state    
                int available = getState();
                //计算剩余的state
                int remaining = available - acquires;
                //如果小于0说明令牌数不足直接返回出去,让外部将线程挂起,反之通过cas修改剩余数,返回大于0的结果让持有令牌的线程执行后续逻辑
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

Semaphore使用注意事项

  1. 获取和释放的时候都可以指定数量,但是要保持一致。
  2. 公平性设置为true会更加合理
  3. 并不必须由获取许可证的线程释放许可证。可以是A获取,B释放。

Condition

详解Condition

Condition
即条件对象,不是很常用或者直接用到的对象,常用于线程等待唤醒操作,例如A线程需要等待某个条件的时候,我们可以通过condition.await()
方法,A线程就会进入阻塞状态。

线程B
执行condition.signal()
方法,则JVM
就会从被阻塞线程中找到等待该condition
的线程。线程A收到可执行信号的时候,他的线程状态就会变成Runnable
可执行状态。

对此我们给出代码示例,可以看到我们从ReentrantLock
 中拿到一个Condition
 对象,让创建的线程进入等待状态,随后让主线程调用condition
 的signal
将其唤醒:

private ReentrantLock lock = new ReentrantLock();
    //条件对象,操控线程的等待和通知
    private Condition condition = lock.newCondition();

    public void waitCondition() throws InterruptedException {
        lock.lock();
        try {
            log.info("等待达到条件后通知");
            condition.await();
            log.info("收到通知,开始执行业务逻辑");
        } finally {
            lock.unlock();
            log.info("执行完成,释放锁");
        }
    }


    public void notifyCondition() throws InterruptedException {
        lock.lock();
        try {
            log.info("达到条件发起通知");
            condition.signal();
            log.info("发起通知结束");
        } finally {
            lock.unlock();
            log.info("发起通知执行完成,释放锁");
        }
    }


    public static void main(String[] args) throws InterruptedException {
        Main obj = new Main();

        new Thread(() -> {
            try {
                obj.waitCondition();
                //让出CPU时间片,交给主线程发起通知
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                log.error("等待条件通知设置失败,失败原因 [{}]", e.getMessage(), e);
            }
        }).start();

        //休眠3s唤醒等待线程
        Thread.sleep(3000);
        obj.notifyCondition();
    }

对应的我们也给出输出结果:

基于条件对象完成生产者、消费者模式

我们假设用一个队列存放一波生产者生产的资源,当资源满了通知消费者消费。当消费者消费空了,通知生产者生产。

所以这时候使用condition
控制流程最合适(这也是阻塞的队列内部的实现),所以我们要定义两个信号,分别为:

  1. 当资源被耗尽,我们就使用资源未满条件(notFull
    ): 调用signal
    通知生产者消费,消费者调用await
    进入等待。
  2. 当资源被填满,使用资源为空条件(notEmpty
    ):将生产者用await
    方法挂起,消费者用signal
    唤醒消费告知非空。

很明显生产者和消费者本质上就是基于这两个标识分别标志自己的等待时机和通知时机,以生产者为例,即每生产一个资源后就可以调用notEmpty
通知消费者消费,当生产者速度过快,则用await等待未满notFull
条件阻塞:

首先我们给出生产者和消费者条件和资源队列声明,基于上述条件我们给出一个经典的生产者和消费者模式的示例,我们首先给出生产者代码,可以看到资源满的时候调用notFull.await();
将自己挂起等待未满,生产资源后调用 notEmpty.signal();
通知消费者消费。

对应消费者示例代码也是一样,当资源消费完全,调用notEmpty.await();
等待不空,一旦消费定量资源调用notFull.signal();
通知生产者生产。

最终代码示例如下:

@Slf4j
public class ProducerMode {

    //锁
    private static ReentrantLock lock = new ReentrantLock();
    // 资源未满
    private Condition notFull = lock.newCondition();
    //资源为空
    private Condition notEmpty = lock.newCondition();

    private Queue<Integer> queue = new PriorityQueue<>(10);
    private int queueMaxSize = 10;

    /**
     * 生产者
     */

    private class Producer extends Thread {
        @Override
        public void run() {

            while (true) {
                lock.lock();

                try {
                    if (queueMaxSize == queue.size()) {
                        log.info("当前队列已满,通知消费者消费");
                        //等待不满条件触发
                        notFull.await();

                    }

                    queue.offer(1);
                    log.info("生产者补货,当前队列有 【{}】", queue.size());
                    //通知消费者队列不空,可以消费
                    notEmpty.signal();
                } catch (Exception e) {
                    log.error("生产者报错,失败原因 [{}]", e.getMessage(), e);
                } finally {
                    lock.unlock();
                }


            }
        }



    }

    /**
     * 消费者
     */

    private class Consumer extends Thread {
        @Override
        public void run() {

            while (true) {
                lock.lock();

                try {
                    if (0 == queue.size()) {
                        log.info("当前队列已空,通知生产者补货");
                        //等待不空条件达到
                        notEmpty.await();

                    }

                    queue.poll();
                    //通知消费者不满了
                    notFull.signal();
                    log.info("消费者完成消费,当前队列还剩余 【{}】个元素", queue.size());
                } catch (Exception e) {
                    log.error("生产者报错,失败原因 [{}]", e.getMessage(), e);
                } finally {
                    lock.unlock();
                }


            }
        }
    }


    public static void main(String[] args) {
        ProducerMode mode = new ProducerMode();
        Producer producer = mode.new Producer();
        ProducerMode.Consumer consumer = mode.new Consumer();
        producer.start();
        consumer.start();
    }
}

对应的我们给出输出结果:

CyclicBarrier

CyclicBarrier 原理和使用示例

CyclicBarrier 也就是循环栅栏对象,不是很常用,它主要用于等待线程数就绪后执行公共逻辑的业务场景。 例如我们希望每凑齐5个线程后执行后续逻辑,我们就可以说明CyclicBarrier
 数值为5,然后每个线程到期后调用await等待其他线程就绪。

一旦到齐5个,CyclicBarrier
 就会通知这些线程开始工作,对应的代码如下所示:

public static void main(String[] args) throws InterruptedException {
        int threadCount = 5;
        CyclicBarrier barrier = new CyclicBarrier(threadCount);

        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                System.out.println("线程 " + Thread.currentThread().getName() + " 开始执行任务");
                try {
                    // 模拟执行任务
                    Thread.sleep(1000);
                    System.out.println("线程 " + Thread.currentThread().getName() + " 到达屏障");
                    barrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }

                System.out.println("所有线程都到达屏障,一起继续执行");
            }).start();
        }
    }

对应的我们给出相应输出示例:

CyclicBarrier 与CountDownLatch区别(重点)

  1. CountDownLatch
    用户事件即主要是业务流程上的控制并不是针对线程,CyclicBarrier
     循环栅栏作用于线程,如上代码必须等待线程到齐后触发。
  2. 循环栅栏可重复使用,CountDownLatch
    则不能。

小结

通过本次对Java并发流程工具的实战探索,我们对Java并发编程领域有了更为深入且全面的认知。 从CountDownLatch到CyclicBarrier,再到Semaphore和Exchanger等工具,每一个都在多线程协作场景中有着独特的用途。CountDownLatch如同倒计时器,能让一组线程等待某个特定事件完成后再继续执行;CyclicBarrier则像聚会的召集者,使多个线程在特定点上汇聚,然后一起继续前行;Semaphore犹如资源的守护者,精确控制着对有限资源的访问;Exchanger为两个线程之间的数据交换提供了安全高效的通道。 在实际的代码实践中,我们看到这些工具如何巧妙地解决多线程协作中复杂的同步和通信问题,极大地提高了程序的并发处理能力和性能。不仅如此,我们还学会了根据不同的业务场景,如任务并行化、资源管理、数据交换等,选择最合适的并发流程工具,以实现最优的解决方案。 然而,Java并发编程是一个广阔且复杂的领域,这些工具在带来便利的同时,也要求我们对线程安全、资源竞争等问题保持高度警惕。在使用过程中,必须深入理解其原理和潜在风险,确保代码的正确性和稳定性。 希望本次的探索能为你在Java并发编程的道路上点亮一盏明灯,在未来面对各种并发挑战时,你能够熟练运用这些工具,编写出高效、可靠且易于维护的多线程程序,为构建更强大、更具竞争力的软件系统奠定坚实的基础。

我是 sharkchili ,CSDN Java 领域博客专家mini-redis的作者,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili 。


同时也非常欢迎你star我的开源项目mini-redis:https://github.com/shark-ctrl/mini-redis

因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注  “加群”  即可和笔者和笔者的朋友们进行深入交流。

参考

控制并发流程:https://blog.csdn.net/DLC990319/article/details/106681012)


文章转载自写代码的SharkChili,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论