From 52122725570cc80b9cfbe99ab25f45c58d12165e Mon Sep 17 00:00:00 2001 From: lxu75 Date: Sun, 25 May 2025 18:59:59 +0800 Subject: [PATCH] =?UTF-8?q?fix=20=E7=BA=BF=E7=A8=8B=E6=B1=A0=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../center/config/AsyncTaskExecutePool.java | 68 ------------------- .../impl/MqMessageRecordServiceImpl.java | 15 ++-- 2 files changed, 4 insertions(+), 79 deletions(-) delete mode 100644 ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/config/AsyncTaskExecutePool.java diff --git a/ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/config/AsyncTaskExecutePool.java b/ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/config/AsyncTaskExecutePool.java deleted file mode 100644 index 51254a4..0000000 --- a/ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/config/AsyncTaskExecutePool.java +++ /dev/null @@ -1,68 +0,0 @@ -package com.volvo.ai.analytic.center.config; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.scheduling.annotation.AsyncConfigurer; -import org.springframework.scheduling.annotation.EnableAsync; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.stereotype.Component; - -import java.util.concurrent.*; - -/** - * 异步任务线程池装配类 - * @author gubin - * @date 2022-04-14 - */ -@EnableAsync -@Slf4j -@Component -public class AsyncTaskExecutePool implements AsyncConfigurer { - - @Value("${task.pool.corePoolSize}") - private int corePoolSize; - - @Value("${task.pool.maxPoolSize}") - private int maxPoolSize; - - @Value("${task.pool.queueCapacity}") - private int queueCapacity; - - @Value("${task.pool.keepAliveSeconds}") - private int keepAliveSeconds; - - - @Bean - @Override - public Executor getAsyncExecutor() { - ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - //核心线程池大小 - executor.setCorePoolSize(corePoolSize); - //最大线程数 - executor.setMaxPoolSize(maxPoolSize); - //队列容量 - executor.setQueueCapacity(queueCapacity); - //活跃时间 - executor.setKeepAliveSeconds(keepAliveSeconds); - //线程名字前缀 - executor.setThreadNamePrefix("async-task-"); - // setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务 - // CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行 - executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); - executor.initialize(); - return executor; - } - - @Override - public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { - return (throwable, method, objects) -> { - log.error("===="+throwable.getMessage()+"====", throwable); - log.error("exception method:"+method.getName()); - }; - } - - - -} diff --git a/ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/service/impl/MqMessageRecordServiceImpl.java b/ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/service/impl/MqMessageRecordServiceImpl.java index 5e2ce92..14236fb 100644 --- a/ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/service/impl/MqMessageRecordServiceImpl.java +++ b/ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/service/impl/MqMessageRecordServiceImpl.java @@ -156,23 +156,16 @@ public class MqMessageRecordServiceImpl extends ServiceImpl caseWorkFlow = CompletableFuture.supplyAsync(() -> callCaseCommunityWorkFlow(difyCommunityTargetDTO,user, caseToken),getAsyncExecutor); + JSONObject caseResult = callCaseCommunityWorkFlow(difyCommunityTargetDTO,user, caseToken); // 内容主题关键词打标 - CompletableFuture keywordWorkFlow = CompletableFuture.supplyAsync(() -> callCommunityWorkFlow(difyCommunityTargetDTO,user, keywordToken),getAsyncExecutor); + JSONObject keywordResult = callCommunityWorkFlow(difyCommunityTargetDTO,user, keywordToken); // litecrm线索分析 - CompletableFuture clueAnalysisWorkFlow = CompletableFuture.supplyAsync(() -> callCommunityWorkFlow(difyCommunityTargetDTO,user, clueAnalysisToken),getAsyncExecutor); + JSONObject clueAnalysisResult = callCommunityWorkFlow(difyCommunityTargetDTO,user, clueAnalysisToken); - CompletableFuture allFutures = CompletableFuture.allOf(caseWorkFlow, keywordWorkFlow, clueAnalysisWorkFlow); - // 等待所有API调用完成 - allFutures.get(); - // 获取各个API的结果 - JSONObject caseResult = caseWorkFlow.get(); - JSONObject keywordResult = keywordWorkFlow.get(); - JSONObject clueAnalysisResult = clueAnalysisWorkFlow.get(); difyResult.add(caseResult); difyResult.add(keywordResult); difyResult.add(clueAnalysisResult);