fix 线程池问题

This commit is contained in:
lxu75
2025-05-25 18:59:59 +08:00
parent 106e34fbd7
commit 5212272557
2 changed files with 4 additions and 79 deletions

View File

@@ -1,68 +0,0 @@
package com.volvo.ai.analytic.center.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.concurrent.*;
/**
* 异步任务线程池装配类
* @author gubin
* @date 2022-04-14
*/
@EnableAsync
@Slf4j
@Component
public class AsyncTaskExecutePool implements AsyncConfigurer {
@Value("${task.pool.corePoolSize}")
private int corePoolSize;
@Value("${task.pool.maxPoolSize}")
private int maxPoolSize;
@Value("${task.pool.queueCapacity}")
private int queueCapacity;
@Value("${task.pool.keepAliveSeconds}")
private int keepAliveSeconds;
@Bean
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程池大小
executor.setCorePoolSize(corePoolSize);
//最大线程数
executor.setMaxPoolSize(maxPoolSize);
//队列容量
executor.setQueueCapacity(queueCapacity);
//活跃时间
executor.setKeepAliveSeconds(keepAliveSeconds);
//线程名字前缀
executor.setThreadNamePrefix("async-task-");
// setRejectedExecutionHandler当pool已经达到max size的时候如何处理新任务
// CallerRunsPolicy不在新线程中执行任务而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (throwable, method, objects) -> {
log.error("===="+throwable.getMessage()+"====", throwable);
log.error("exception method:"+method.getName());
};
}
}

View File

@@ -156,23 +156,16 @@ public class MqMessageRecordServiceImpl extends ServiceImpl<MqMessageRecordMappe
return true; return true;
} }
private void processDify(DifyCommunityTargetDTO difyCommunityTargetDTO ,String user, JSONArray difyResult, String aiAnalysisRequestId) throws InterruptedException, ExecutionException { private void processDify(DifyCommunityTargetDTO difyCommunityTargetDTO ,String user, JSONArray difyResult, String aiAnalysisRequestId) {
//舆情案件分析 //舆情案件分析
CompletableFuture<JSONObject> caseWorkFlow = CompletableFuture.supplyAsync(() -> callCaseCommunityWorkFlow(difyCommunityTargetDTO,user, caseToken),getAsyncExecutor); JSONObject caseResult = callCaseCommunityWorkFlow(difyCommunityTargetDTO,user, caseToken);
// 内容主题关键词打标 // 内容主题关键词打标
CompletableFuture<JSONObject> keywordWorkFlow = CompletableFuture.supplyAsync(() -> callCommunityWorkFlow(difyCommunityTargetDTO,user, keywordToken),getAsyncExecutor); JSONObject keywordResult = callCommunityWorkFlow(difyCommunityTargetDTO,user, keywordToken);
// litecrm线索分析 // litecrm线索分析
CompletableFuture<JSONObject> clueAnalysisWorkFlow = CompletableFuture.supplyAsync(() -> callCommunityWorkFlow(difyCommunityTargetDTO,user, clueAnalysisToken),getAsyncExecutor); JSONObject clueAnalysisResult = callCommunityWorkFlow(difyCommunityTargetDTO,user, clueAnalysisToken);
CompletableFuture<Void> allFutures = CompletableFuture.allOf(caseWorkFlow, keywordWorkFlow, clueAnalysisWorkFlow);
// 等待所有API调用完成
allFutures.get();
// 获取各个API的结果
JSONObject caseResult = caseWorkFlow.get();
JSONObject keywordResult = keywordWorkFlow.get();
JSONObject clueAnalysisResult = clueAnalysisWorkFlow.get();
difyResult.add(caseResult); difyResult.add(caseResult);
difyResult.add(keywordResult); difyResult.add(keywordResult);
difyResult.add(clueAnalysisResult); difyResult.add(clueAnalysisResult);