暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

使用动态线程池优化你的代码

程序员恰恰 2024-03-10
265


需求:用户登录后查看自己的各种统计信息,这些信息来源是不同的微服务模块。

串行化查询:
执行过程如下:假如任务1用时1s,任务2用时2s,任务3用时3s,那么查询结果总耗时6s.执行过程如下:

伪代码:

    /**
     * 任务1
     */

    static String getName() {
        try {
            // 停止执行1秒钟
            TimeUnit.SECONDS.sleep(1); // 停止执行1秒钟
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "name";
    }

    /**
     * 任务2
     * @return
     */

    static String getScore() {
        try {
            // 停止执行2秒钟
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "score";
    }
    /**
     * 任务3
     * @return
     */

    static String getOrderInfo() {
        try {
            // 停止执行3秒钟
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "orderInfo";
    }

      private static final Logger log = LoggerFactory.getLogger(CustomerController.class);

    /**
     * 串行化查询:任务1,任务2,任务3
     * @return
     */

    @RequestMapping("getAllInfoV1")
    public Boolean getAllInfoV1() {
        //开始时间
        long beginTime = System.currentTimeMillis();
        Customer customer = new Customer();
        //任务1
        customer.setCname(getName());
        //任务2
        customer.setOrderInfo(getOrderInfo());
        //任务3
        customer.setScore(getScore());
        //结束时间
        long endTime = System.currentTimeMillis();
        log.info("串行执行耗时:{}", endTime - beginTime);
        return Boolean.TRUE;
    }

执行结果:代码执行了6s.

优化方案:并行执行

可以使用CompletableFuture来并发执行多个异步任务,并在它们全部完成后进行join操作等待结果。那么这个代码的执行时间就取决于最长的业务执行的时间。

伪代码:
   /**
     * 并行查询,任务1,任务2,任务3
     *
     * @return
     */

    @RequestMapping("/getAllInfoV2")
    public Boolean getAllInfoV2() {
        //模拟使用线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10201L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(30));
        //开始时间
        long beginTime = System.currentTimeMillis();
        Customer customer = new Customer();
        //任务1
        CompletableFuture<Boolean> completableFutureGetName = CompletableFuture.supplyAsync(() -> {
            customer.setCname(getName());
            return Boolean.TRUE;
        }, threadPoolExecutor);
        //任务2
        CompletableFuture<Boolean> completableFutureGetOrderInfo = CompletableFuture.supplyAsync(() -> {
            customer.setCname(getOrderInfo());
            return Boolean.TRUE;
        }, threadPoolExecutor);
        //任务2
        CompletableFuture<Boolean> completableFutureGetScore = CompletableFuture.supplyAsync(() -> {
            customer.setCname(getScore());
            return Boolean.TRUE;
        }, threadPoolExecutor);

        //并行执行
        CompletableFuture.allOf(completableFutureGetName, completableFutureGetOrderInfo, completableFutureGetScore).join();
        //结束时间
        long endTime = System.currentTimeMillis();
        log.info("串行执行耗时:{}", endTime - beginTime);
        return Boolean.TRUE;
    }

执行结果:执行结果取决于最长的服务的执行时间,执行结果大约3s。
继续优化:使用动态线程池
上面的方式虽然使用了多线程以及线程池,但是不能实时调整线程池配置信息。可以通过nacos实现一个动态线程池。
在nacos中添加配置文件。
   core:
    size : 5
max:
    size : 10


基于 Nacos 配置的动态线程池管理功能,可以根据配置的变化来动态调整线程池的参数,同时监控线程池的状态并动态添加任务到线程池中。


核心代码如下:

package com.ruoyi.system.conf;

import com.alibaba.cloud.nacos.NacosConfigManager;
import com.alibaba.cloud.nacos.NacosConfigProperties;
import com.alibaba.nacos.api.config.listener.Listener;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@RefreshScope
@Configuration
public class DynamicThreadPool implements InitializingBean {
    @Value("${core.size}")
    private String coreSize;

    @Value("${max.size}")
    private String maxSize;

    private static ThreadPoolExecutor threadPoolExecutor;

    @Autowired
    private NacosConfigManager nacosConfigManager;

    @Autowired
    private NacosConfigProperties nacosConfigProperties;

    @Override
    public void afterPropertiesSet() throws Exception {
        //按照nacos配置初始化线程池
        threadPoolExecutor = new ThreadPoolExecutor(Integer.parseInt(coreSize), Integer.parseInt(maxSize), 10L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(10),
                new ThreadFactoryBuilder().setNameFormat("c_t_%d").build(),
                new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        System.out.println("rejected!");
                    }
                });

        //nacos配置变更监听,ruoyi-system-dev.yml是你添加参数配置文件
        nacosConfigManager.getConfigService().addListener("ruoyi-system-dev.yml", nacosConfigProperties.getGroup(),
                new Listener() {
                    @Override
                    public Executor getExecutor() {
                        return null;
                    }
                    @Override
                    public void receiveConfigInfo(String configInfo) {
                        //配置变更,修改线程池配置
                        System.out.println(configInfo);
                        changeThreadPoolConfig(Integer.parseInt(coreSize), Integer.parseInt(maxSize));
                    }
                });
    }

    /**
     * 打印当前线程池的状态
     */

    public String printThreadPoolStatus() {
        return String.format("core_size:%s,thread_current_size:%s;" +
                        "thread_max_size:%s;queue_current_size:%s,total_task_count:%s", threadPoolExecutor.getCorePoolSize(),
                threadPoolExecutor.getActiveCount(), threadPoolExecutor.getMaximumPoolSize(), threadPoolExecutor.getQueue().size(),
                threadPoolExecutor.getTaskCount());
    }

    /**
     * 给线程池增加任务
     *
     * @param count
     */

    public void dynamicThreadPoolAddTask(int count) {
        for (int i = 0; i < count; i++) {
            int finalI = i;
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(finalI);
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

    /**
     * 修改线程池核心参数
     *
     * @param coreSize
     * @param maxSize
     */

    private void changeThreadPoolConfig(int coreSize, int maxSize) {
        threadPoolExecutor.setCorePoolSize(coreSize);
        threadPoolExecutor.setMaximumPoolSize(maxSize);
    }
}

如果代码报错,需要添加依赖
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>30.1-jre</version>
        </dependency>

代码解释:

@RefreshScope:这个注解用来支持nacos的动态刷新功能;

@Value("${max.size}"),@Value("${core.size}"):这两个注解用来读取我们上一步在nacos配置的具体信息;同时,nacos配置变更时,能够实时读取到变更后的内容

1、DynamicThreadPool 类实现了 InitializingBean 接口,意味着在 Bean 的属性设置后会执行 afterPropertiesSet 方法。在该方法中完成了线程池的初始化和配置监听。

2、afterPropertiesSet 方法中:创建了一个 ThreadPoolExecutor 对象 threadPoolExecutor,根据从 Nacos 配置中获取的 core.size 和 max.size 参数来初始化线程池的核心线程数和最大线程数。设置了线程池的队列、线程工厂和拒绝策略。添加了一个 Nacos 配置监听器nacosConfigManager.getConfigService().addListener,用于监听配置文件的变化。当配置信息发生变化时,会通过回调函数 receiveConfigInfo 来更新线程池的配置。

4、printThreadPoolStatus(): 这个方法用于打印当前线程池的状态,返回一个包含线程池各项状态信息的字符串,包括核心线程数、当前活动线程数、最大线程数、队列中等待的任务数以及总任务数量。5

5、dynamicThreadPoolAddTask(int count): 这个方法用于向线程池动态添加任务。根据传入的任务数量 count,循环添加指定数量的任务到线程池中。每个任务都是一个实现了 Runnable 接口的匿名内部类,在其中执行任务逻辑,这里是输出当前任务的编号并休眠10秒。
6、changeThreadPoolConfig(int coreSize, int maxSize): 这个方法用于修改线程池的核心参数,包括核心线程数和最大线程数。通过调用 setCorePoolSize() 和 setMaximumPoolSize() 方法,可以动态地修改线程池的核心线程数和最大线程数。
为了观察线程池动态变更的效果,增加Controller类测试。
@RestController
@RequestMapping("/threadpool")
public class ThreadPoolController {

    @Autowired
    private DynamicThreadPool dynamicThreadPool;

    /**
     * 打印当前线程池的状态
     */

    @GetMapping("/print")
    public String printThreadPoolStatus() {
        return dynamicThreadPool.printThreadPoolStatus();
    }

    /**
     * 给线程池增加任务
     *
     * @param count
     */

    @GetMapping("/add")
    public String dynamicThreadPoolAddTask(int count) {
        dynamicThreadPool.dynamicThreadPoolAddTask(count);
        return String.valueOf(count);
    }
}

测试:查看当前的核心线程数和最大线程数
http://localhost:9201/threadpool/print

手动更改nacos配置文件后,再次请求,控制台打印了当前的线程数信息。

通过接口查询:已经更改完成
验证有没有生效,我们通过以下接口给服务添加线程数量
http://localhost:9201/threadpool/add?count=20
这个接口的实现方法是通过new Thread方式增加线程数。

当最大线程池慢的时候就开始执行拒绝策略

这时候我们修改线程池参数修改最大线程数是200,核心线程数为6,发现没有拒绝信息了,同时控制台打印了当前的线程数量。

那么参数设置多少合适呢?对于IO密集型任务和计算密集型任务,线程池的设置略有不同:

  1. IO密集型任务:

  2. 对于IO密集型任务,通常建议设置较大的线程池大小,以便充分利用CPU等资源,同时能够处理大量的IO操作。

    可以考虑设置线程池大小为2 * CPU核心数或更大,这样可以充分利用系统资源并提高IO操作的并发处理能力。

  3. 计算密集型任务

  4. 对于计算密集型任务,由于任务主要耗费在CPU计算上,因此需要限制线程池的大小,避免过多线程竞争CPU资源而导致性能下降。

    建议将线程池的大小设置为CPU核心数加1或2,这样可以充分利用CPU资源而又不至于引起过多的线程切换导致性能损失。



文章转载自程序员恰恰,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论