Spring @Async 异步线程池选择
故障
上线一个与下述定时任务无关的新需求,定时任务流程陷入卡死状态,具体表现为:
•提交 2 条 任务后,任务未执行且无任何报错日志,系统阻塞无响应
•线程池监控数据显示:业务线程池ocrTaskExecutor活跃线程数 = 2(已达最大线程数上限),队列等待任务数持续增加至 10+
•业务日志仅打印任务提交记录,无任务执行痕迹
任务执行源码:
log.info("{} task start handle, task size:{}", LOG_PRE, taskQueueDOList.size());
// 使用线程池并行处理任务 executorService即为ocrTaskExecutor
List<Future<TaskQueueDO>> futures = taskQueueDOList.stream()
.map(taskQueueDO -> executorService.submit(() -> getData(taskQueueDO)))
.collect(Collectors.toList());
int successCount = 0;
int errorCount = 0;
// 收集处理结果
for (int i = 0; i < futures.size(); i++) {
try {
TaskQueueDO updatedTaskQueueDO = futures.get(i).get();
taskQueueDOList.set(i, updatedTaskQueueDO);
successCount++;
log.info("{} 算法处理中,successCount:{}, errorCount:{}", LOG_PRE, successCount, errorCount);
} catch (Exception e) {
log.error("{}Error processing task", LOG_PRE, e);
// 设置任务状态为失败
taskQueueDOList.get(i).setStatus(TaskStatusEnum.FAILED.getCode());
errorCount++;
}
}
通过jstack命令打印线程堆栈信息:
•业务线程池`ocrTaskExecutor`的 2 个核心线程均处于`RUNNABLE`状态,但执行的并非 OCR 业务逻辑
•线程堆栈显示,这两个线程正在执行 Kafka 消息消费逻辑,具体阻塞在`KafkaConsumer.poll(Duration.ofMillis(1000))`方法
•Kafka 消费任务通过`@Async`注解实现异步执行,但未显式指定线程池
配置分析
系统通过ThreadPoolConfig类配置了 4 个线程池,用于不同业务场景,核心配置如下:
@Slf4j
@EnableAsync
@Configuration
public class ThreadPoolConfig {
// 业务线程池:OCR识别任务专用
@Bean(name = "ocrTaskExecutor")
public ThreadPoolTaskExecutor getOCRTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(1000);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("OcrTaskExecutor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
// 路由计算线程池
@Bean(name = "routeTaskExecutor")
public ExecutorService getRouteTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(16);
executor.setMaxPoolSize(32);
executor.setQueueCapacity(2000);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("RouteTaskExecutor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor.getThreadPoolExecutor();
}
// WiFi上报线程池
@Bean(name = "wifiReportExecutor")
public ExecutorService getWifiReportExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(64);
executor.setMaxPoolSize(128);
executor.setQueueCapacity(20000);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("WifiReportExecutor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor.getThreadPoolExecutor();
}
// 仓储数据同步线程池
@Bean(name = "warehouseData2MasterExecutor")
public ExecutorService getWarehouseData2MasterExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(2000);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("WarehouseData2MasterExecutor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor.getThreadPoolExecutor();
}
}
Kafka 消息消费通过KafkaConsumerProxy类实现,核心代码如下:
@Slf4j
@Configuration
@Scope("prototype")
public class KafkaConsumerProxy {
// 未指定线程池的@Async注解
@Async
public void subscribe(String topic, Properties properties, MessageListener listener) {
log.info("消息订阅初始化, topic={}", topic);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Collections.singletonList(topic));
// 无限循环消费消息
while (true) {
try {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
List<ConsumerRecord<String, String>> records = new ArrayList<>();
for (ConsumerRecord<String, String> record : consumerRecords) {
records.add(record);
}
if (CollUtil.isNotEmpty(records)) {
listener.onMessage(records);
}
} catch (org.apache.kafka.common.errors.InterruptException e) {
log.error("kafka消费线程中断异常, topic={}", topic, e);
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("kafka消费发生异常, topic={}", topic, e);
}
}
}
}
•4 个线程池配置中,仅`ocrTaskExecutor`直接返回`ThreadPoolTaskExecutor`类型(实现 Spring 的`TaskExecutor`接口)
•其余 3 个线程池均返回`ExecutorService`类型(实际为 JDK 的`ThreadPoolExecutor`,未实现`TaskExecutor`接口)
•Kafka 消费的`@Async`注解未指定线程池名称,触发 Spring 的默认线程池选择逻辑 ### 选择机制 AsyncExecutionAspectSupport类位于org.springframework.aop.interceptor包下,核心职责是为异步方法确定合适的执行线程池,关键方法包括:
•determineAsyncExecutor(Method method):线程池选择的入口方法
•getDefaultExecutor(BeanFactory beanFactory):默认线程池获取逻辑
•findQualifiedExecutor(BeanFactory beanFactory, String qualifier):指定线程池名称时的查找逻辑 #### 入口方法:determineAsyncExecutor
该方法是线程池选择的总入口,负责缓存复用、线程池查找与适配,源码如下:
@Nullable
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
// 1. 先从缓存中获取当前方法已绑定的线程池(避免重复查找)
AsyncTaskExecutor executor = (AsyncTaskExecutor)this.executors.get(method);
if (executor == null) {
// 2. 获取@Async注解中指定的线程池名称(qualifier)
String qualifier = this.getExecutorQualifier(method);
Executor targetExecutor;
if (StringUtils.hasLength(qualifier)) {
// 3. 若指定了线程池名称,根据名称查找
targetExecutor = this.findQualifiedExecutor(this.beanFactory, qualifier);
} else {
// 4. 未指定名称,获取默认线程池
targetExecutor = (Executor)this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
// 5. 将找到的Executor适配为Spring异步任务执行器(AsyncTaskExecutor)
executor = (AsyncTaskExecutor)(targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor)targetExecutor : new TaskExecutorAdapter(targetExecutor));
// 6. 缓存当前方法与线程池的映射关系
this.executors.put(method, executor);
}
return executor;
}
注解指定线程池:findQualifiedExecutor
当@Async注解指定线程池名称(如@Async(“kafkaExecutor”))时,通过该方法根据名称查找线程池,核心逻辑是调用BeanFactoryAnnotationUtils.qualifiedBeanOfType从 Spring 容器中获取名称匹配的Executor类型 Bean:
@Nullable
protected Executor findQualifiedExecutor(@Nullable BeanFactory beanFactory, String qualifier) {
if (beanFactory == null) {
throw new IllegalStateException("BeanFactory must be set on " + this.getClass().getSimpleName() + " to access qualified executor '" + qualifier + "'");
} else {
// 根据名称和类型查找线程池Bean
return (Executor)BeanFactoryAnnotationUtils.qualifiedBeanOfType(beanFactory, Executor.class, qualifier);
}
}
默认线程池获取:getDefaultExecutor(核心逻辑)
当@Async未指定线程池名称时,默认线程池由该方法获取,这是本次故障的关键所在。源码如下:
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
if (beanFactory != null) {
try {
// 第一步:优先查找类型为TaskExecutor的Bean
return (Executor)beanFactory.getBean(TaskExecutor.class);
} catch (NoUniqueBeanDefinitionException ex) {
// 存在多个TaskExecutor类型Bean,进入异常处理
this.logger.debug("Could not find unique TaskExecutor bean. Continuing search for an Executor bean named 'taskExecutor'", ex);
try {
// 第二步:尝试查找名称为"taskExecutor"的Bean
return (Executor)beanFactory.getBean("taskExecutor", Executor.class);
} catch (NoSuchBeanDefinitionException var4) {
if (this.logger.isInfoEnabled()) {
this.logger.info("More than one TaskExecutor bean found within the context, and none is named 'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
}
}
} catch (NoSuchBeanDefinitionException ex) {
// 无TaskExecutor类型Bean,进入异常处理
this.logger.debug("Could not find default TaskExecutor bean. Continuing search for an Executor bean named 'taskExecutor'", ex);
try {
// 第二步:尝试查找名称为"taskExecutor"的Bean
return (Executor)beanFactory.getBean("taskExecutor", Executor.class);
} catch (NoSuchBeanDefinitionException var5) {
this.logger.info("No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either");
}
}
}
// 所有查找失败,返回null
return null;
}
@Async注解选择线程池的完整优先级规则(从高到低):
1.缓存优先:若当前方法之前已选择过线程池,直接复用缓存中的线程池(executors集合存储)
2.显式指定:若@Async("threadPoolName")指定了线程池名称,优先根据名称查找容器中对应的Executor类型 Bean
3.类型匹配(TaskExecutor):未指定名称时,优先查找容器中**类型为TaskExecutor的唯一 Bean**(ThreadPoolTaskExecutor是其实现类)
4.名称匹配(taskExecutor):若存在多个TaskExecutor类型 Bean,或无TaskExecutor类型 Bean,尝试查找**名称为taskExecutor的Executor类型 Bean**(DEFAULT_TASK_EXECUTOR_BEAN_NAME常量定义)
5.默认兜底:以上均未找到时,defaultExecutor返回null,最终 Spring 会使用内置的SimpleAsyncTaskExecutor
结合源码规则与系统配置,故障原因:
1.Kafka 消费的@Async注解未指定线程池名称,触发默认线程池查找逻辑
2.系统中仅ocrTaskExecutor是TaskExecutor类型 Bean(其余 3 个为 JDK 的ExecutorService),满足 “类型匹配(TaskExecutor)” 规则
3.Spring 通过getDefaultExecutor方法找到唯一的TaskExecutor类型 Bean(ocrTaskExecutor),并将其作为 Kafka 消费任务的执行线程池
4.Kafka 消费逻辑采用while (true)无限循环,持续占用ocrTaskExecutor的核心线程(最大 2 个),导致后续提交的 OCR 业务任务无法获取线程资源,只能进入队列等待,最终系统卡死 ### 多 TaskExecutor 类型 Bean 的选择逻辑 若系统中所有线程池 Bean 均为ThreadPoolTaskExecutor类型(均实现TaskExecutor接口)
假设所有线程池均配置为ThreadPoolTaskExecutor类型:
@Bean(name = "ocrTaskExecutor")
public ThreadPoolTaskExecutor ocrTaskExecutor() { ... }
@Bean(name = "routeTaskExecutor")
public ThreadPoolTaskExecutor routeTaskExecutor() { ... }
@Bean(name = "wifiReportExecutor")
public ThreadPoolTaskExecutor wifiReportExecutor() { ... }
@Bean(name = "warehouseData2MasterExecutor")
public ThreadPoolTaskExecutor warehouseData2MasterExecutor() { ... }
线程池选择流程
getDefaultExecutor方法查找TaskExecutor类型 Bean 时,发现多个实例,抛出NoUniqueBeanDefinitionException
进入异常处理逻辑,尝试查找名称为taskExecutor的Executor类型 Bean
无taskExecutor名称 Bean 时,所有未指定线程池的@Async任务均使用SimpleAsyncTaskExecutor(每次新建线程,无资源控制)
潜在风险
SimpleAsyncTaskExecutor无线程复用机制,默认无限创建新线程,高并发场景下可能导致:
•线程数量暴增,CPU 上下文切换频繁,系统性能急剧下降
•内存资源耗尽,触发 OOM 异常
•线程泄漏,长期占用系统资源
解决方案:异步线程池规范化配置与隔离
显式指定、功能隔离、可控配置
为不同类型的异步任务配置专属线程池,通过@Async显式指定,彻底避免资源竞争。
新增 Kafka 消费专属线程池
在ThreadPoolConfig中添加 Kafka 消费专用线程池:
// Kafka消息消费专属线程池
@Bean(name = "kafkaConsumerExecutor")
public ThreadPoolTaskExecutor kafkaConsumerExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数:根据Kafka分区数和消费能力配置
executor.setCorePoolSize(3);
// 最大线程数:核心线程数的1.5~2倍
executor.setMaxPoolSize(5);
// 队列容量:适中配置,避免任务过度积压
executor.setQueueCapacity(500);
// 空闲线程存活时间:60秒
executor.setKeepAliveSeconds(60);
// 线程名称前缀:便于日志追踪和问题定位
executor.setThreadNamePrefix("KafkaConsumer-");
// 拒绝策略:队列满时由提交线程执行,避免任务丢失
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化线程池
executor.initialize();
return executor;
}
显式指定线程池
修改 Kafka 消费代码,通过@Async注解显式指定专属线程池:
@Async("kafkaConsumerExecutor") // 显式指定Kafka消费线程池
public void subscribe(String topic, Properties properties, MessageListener listener) {
// 原有消费逻辑不变
log.info("消息订阅初始化, topic={}", topic);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Collections.singletonList(topic));
while (true) {