

伪代码:
/**
* 任务1
*/
static String getName() {
try {
// 停止执行1秒钟
TimeUnit.SECONDS.sleep(1); // 停止执行1秒钟
} catch (InterruptedException e) {
e.printStackTrace();
}
return "name";
}
/**
* 任务2
* @return
*/
static String getScore() {
try {
// 停止执行2秒钟
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "score";
}
/**
* 任务3
* @return
*/
static String getOrderInfo() {
try {
// 停止执行3秒钟
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "orderInfo";
}
private static final Logger log = LoggerFactory.getLogger(CustomerController.class);
/**
* 串行化查询:任务1,任务2,任务3
* @return
*/
@RequestMapping("getAllInfoV1")
public Boolean getAllInfoV1() {
//开始时间
long beginTime = System.currentTimeMillis();
Customer customer = new Customer();
//任务1
customer.setCname(getName());
//任务2
customer.setOrderInfo(getOrderInfo());
//任务3
customer.setScore(getScore());
//结束时间
long endTime = System.currentTimeMillis();
log.info("串行执行耗时:{}", endTime - beginTime);
return Boolean.TRUE;
}
执行结果:代码执行了6s.

优化方案:并行执行
可以使用CompletableFuture来并发执行多个异步任务,并在它们全部完成后进行join操作等待结果。那么这个代码的执行时间就取决于最长的业务执行的时间。

/**
* 并行查询,任务1,任务2,任务3
*
* @return
*/
@RequestMapping("/getAllInfoV2")
public Boolean getAllInfoV2() {
//模拟使用线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(30));
//开始时间
long beginTime = System.currentTimeMillis();
Customer customer = new Customer();
//任务1
CompletableFuture<Boolean> completableFutureGetName = CompletableFuture.supplyAsync(() -> {
customer.setCname(getName());
return Boolean.TRUE;
}, threadPoolExecutor);
//任务2
CompletableFuture<Boolean> completableFutureGetOrderInfo = CompletableFuture.supplyAsync(() -> {
customer.setCname(getOrderInfo());
return Boolean.TRUE;
}, threadPoolExecutor);
//任务2
CompletableFuture<Boolean> completableFutureGetScore = CompletableFuture.supplyAsync(() -> {
customer.setCname(getScore());
return Boolean.TRUE;
}, threadPoolExecutor);
//并行执行
CompletableFuture.allOf(completableFutureGetName, completableFutureGetOrderInfo, completableFutureGetScore).join();
//结束时间
long endTime = System.currentTimeMillis();
log.info("串行执行耗时:{}", endTime - beginTime);
return Boolean.TRUE;
}


core:
size : 5
max:
size : 10
基于 Nacos 配置的动态线程池管理功能,可以根据配置的变化来动态调整线程池的参数,同时监控线程池的状态并动态添加任务到线程池中。
核心代码如下:
package com.ruoyi.system.conf;
import com.alibaba.cloud.nacos.NacosConfigManager;
import com.alibaba.cloud.nacos.NacosConfigProperties;
import com.alibaba.nacos.api.config.listener.Listener;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@RefreshScope
@Configuration
public class DynamicThreadPool implements InitializingBean {
@Value("${core.size}")
private String coreSize;
@Value("${max.size}")
private String maxSize;
private static ThreadPoolExecutor threadPoolExecutor;
@Autowired
private NacosConfigManager nacosConfigManager;
@Autowired
private NacosConfigProperties nacosConfigProperties;
@Override
public void afterPropertiesSet() throws Exception {
//按照nacos配置初始化线程池
threadPoolExecutor = new ThreadPoolExecutor(Integer.parseInt(coreSize), Integer.parseInt(maxSize), 10L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10),
new ThreadFactoryBuilder().setNameFormat("c_t_%d").build(),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("rejected!");
}
});
//nacos配置变更监听,ruoyi-system-dev.yml是你添加参数配置文件
nacosConfigManager.getConfigService().addListener("ruoyi-system-dev.yml", nacosConfigProperties.getGroup(),
new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configInfo) {
//配置变更,修改线程池配置
System.out.println(configInfo);
changeThreadPoolConfig(Integer.parseInt(coreSize), Integer.parseInt(maxSize));
}
});
}
/**
* 打印当前线程池的状态
*/
public String printThreadPoolStatus() {
return String.format("core_size:%s,thread_current_size:%s;" +
"thread_max_size:%s;queue_current_size:%s,total_task_count:%s", threadPoolExecutor.getCorePoolSize(),
threadPoolExecutor.getActiveCount(), threadPoolExecutor.getMaximumPoolSize(), threadPoolExecutor.getQueue().size(),
threadPoolExecutor.getTaskCount());
}
/**
* 给线程池增加任务
*
* @param count
*/
public void dynamicThreadPoolAddTask(int count) {
for (int i = 0; i < count; i++) {
int finalI = i;
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(finalI);
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
/**
* 修改线程池核心参数
*
* @param coreSize
* @param maxSize
*/
private void changeThreadPoolConfig(int coreSize, int maxSize) {
threadPoolExecutor.setCorePoolSize(coreSize);
threadPoolExecutor.setMaximumPoolSize(maxSize);
}
}
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1-jre</version>
</dependency>
@RefreshScope:这个注解用来支持nacos的动态刷新功能;
@Value("${max.size}"),@Value("${core.size}"):这两个注解用来读取我们上一步在nacos配置的具体信息;同时,nacos配置变更时,能够实时读取到变更后的内容
1、DynamicThreadPool 类实现了 InitializingBean 接口,意味着在 Bean 的属性设置后会执行 afterPropertiesSet 方法。在该方法中完成了线程池的初始化和配置监听。
2、在 afterPropertiesSet 方法中:创建了一个 ThreadPoolExecutor 对象 threadPoolExecutor,根据从 Nacos 配置中获取的 core.size 和 max.size 参数来初始化线程池的核心线程数和最大线程数。设置了线程池的队列、线程工厂和拒绝策略。添加了一个 Nacos 配置监听器nacosConfigManager.getConfigService().addListener,用于监听配置文件的变化。当配置信息发生变化时,会通过回调函数 receiveConfigInfo 来更新线程池的配置。
4、printThreadPoolStatus(): 这个方法用于打印当前线程池的状态,返回一个包含线程池各项状态信息的字符串,包括核心线程数、当前活动线程数、最大线程数、队列中等待的任务数以及总任务数量。5
@RestController
@RequestMapping("/threadpool")
public class ThreadPoolController {
@Autowired
private DynamicThreadPool dynamicThreadPool;
/**
* 打印当前线程池的状态
*/
@GetMapping("/print")
public String printThreadPoolStatus() {
return dynamicThreadPool.printThreadPoolStatus();
}
/**
* 给线程池增加任务
*
* @param count
*/
@GetMapping("/add")
public String dynamicThreadPoolAddTask(int count) {
dynamicThreadPool.dynamicThreadPoolAddTask(count);
return String.valueOf(count);
}
}
http://localhost:9201/threadpool/print



http://localhost:9201/threadpool/add?count=20

当最大线程池慢的时候就开始执行拒绝策略

这时候我们修改线程池参数修改最大线程数是200,核心线程数为6,发现没有拒绝信息了,同时控制台打印了当前的线程数量。

那么参数设置多少合适呢?对于IO密集型任务和计算密集型任务,线程池的设置略有不同:
IO密集型任务:
计算密集型任务:
对于IO密集型任务,通常建议设置较大的线程池大小,以便充分利用CPU等资源,同时能够处理大量的IO操作。
可以考虑设置线程池大小为2 * CPU核心数或更大,这样可以充分利用系统资源并提高IO操作的并发处理能力。
对于计算密集型任务,由于任务主要耗费在CPU计算上,因此需要限制线程池的大小,避免过多线程竞争CPU资源而导致性能下降。
建议将线程池的大小设置为CPU核心数加1或2,这样可以充分利用CPU资源而又不至于引起过多的线程切换导致性能损失。






