From 5d7a788737037c57ead6892520c5f4c76ac80117 Mon Sep 17 00:00:00 2001 From: ZLI263 Date: Sat, 20 Sep 2025 20:21:44 +0800 Subject: [PATCH] =?UTF-8?q?=E9=93=AD=E7=89=8C=E5=9C=BA=E6=99=AF=E7=9A=84?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E9=87=8D=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../center/enums/BusinessTypeEnum.java | 5 + .../analytic/center/enums/CategoryEnum.java | 8 +- .../ai/analytic/center/job/CorpusFailJob.java | 3 +- .../center/mq/NameplateKafkaConsumer.java | 2 +- .../service/TmNameplateCorpusService.java | 2 +- .../impl/TmNameplateCorpusServiceImpl.java | 207 ++++++++++++++---- 6 files changed, 181 insertions(+), 46 deletions(-) diff --git a/ai-analytic-center-api/src/main/java/com/volvo/ai/analytic/center/enums/BusinessTypeEnum.java b/ai-analytic-center-api/src/main/java/com/volvo/ai/analytic/center/enums/BusinessTypeEnum.java index 6300391..06f5530 100644 --- a/ai-analytic-center-api/src/main/java/com/volvo/ai/analytic/center/enums/BusinessTypeEnum.java +++ b/ai-analytic-center-api/src/main/java/com/volvo/ai/analytic/center/enums/BusinessTypeEnum.java @@ -40,6 +40,11 @@ public enum BusinessTypeEnum { SPOKESMAN_SENSITIVE_WORD("SPOKESMAN_SENSITIVE_WORD", "特邀发言官敏感词"), //特邀发言官审核规则 SPOKESMAN_AUDIT_RULE("SPOKESMAN_AUDIT_RULE", "特邀发言官审核规则"), + + CORPUS_PORTRAIT_DCC("CORPUS_PORTRAIT_DCC", "智能助手-画像-DCC"), + CORPUS_PORTRAIT_QIWEI("CORPUS_PORTRAIT_QIWEI", "智能助手-画像-企微"), + CORPUS_PORTRAIT_NAMEPLATE("CORPUS_PORTRAIT_NAMEPLATE", "智能助手-画像-铭牌"), + ; private String code; diff --git a/ai-analytic-center-api/src/main/java/com/volvo/ai/analytic/center/enums/CategoryEnum.java b/ai-analytic-center-api/src/main/java/com/volvo/ai/analytic/center/enums/CategoryEnum.java index b280381..1235635 100644 --- a/ai-analytic-center-api/src/main/java/com/volvo/ai/analytic/center/enums/CategoryEnum.java +++ b/ai-analytic-center-api/src/main/java/com/volvo/ai/analytic/center/enums/CategoryEnum.java @@ -4,10 +4,16 @@ public enum CategoryEnum { ENTERPRISE_WECHAT("enterprise_wechat", "企微记录"), PHONE_VOICE("phone_voice", "语音电话"), - + PORTRAIT_ALLIN("portrait_allin", "语音电话,用户画像"), NAMEPLATE_VOICE("nameplate_voice", "铭牌"), + NAMEPLATE_VOICE_PORTRAIT("nameplate_voice_portrait", "铭牌客户画像"), OTHER("未知", "未知记录"), + + CORPUS_PORTRAIT_DCC("CORPUS_PORTRAIT_DCC", "智能助手-画像-DCC"), + CORPUS_PORTRAIT_QIWEI("CORPUS_PORTRAIT_QIWEI", "智能助手-画像-企微"), + CORPUS_PORTRAIT_NAMEPLATE("CORPUS_PORTRAIT_NAMEPLATE", "智能助手-画像-铭牌"), + ; private String code; private String message; diff --git a/ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/job/CorpusFailJob.java b/ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/job/CorpusFailJob.java index 638f9f0..791f3e5 100644 --- a/ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/job/CorpusFailJob.java +++ b/ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/job/CorpusFailJob.java @@ -139,7 +139,8 @@ public class CorpusFailJob { List nameplateCorpusList = tmNameplateCorpusService.queryTelephoneCorpusByCustomerFlowId( Arrays.asList(corpusReportDTO.getCustomerFlowId())); if(CollectionUtils.isNotEmpty(nameplateCorpusList)) { TmNameplateCorpus nameplateCorpus = nameplateCorpusList.get(0); - tmNameplateCorpusService.sendNameplateLto(execDifyFlow,oldAiAnalysisRequestLogs.getAiAnalysisRequestId(), nameplateCorpus); + tmNameplateCorpusService.sendNameplateLto(execDifyFlow,oldAiAnalysisRequestLogs.getAiAnalysisRequestId(), + nameplateCorpus, BusinessTypeEnum.SMART_ASSISTANT_NAMEPLATE.getCode()); } }else { List dccDtoList = tmTelephoneCorpusMapper.queryTelephoneCorpusBySourceIds( Arrays.asList(corpusReportDTO.getRecordId())); diff --git a/ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/mq/NameplateKafkaConsumer.java b/ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/mq/NameplateKafkaConsumer.java index c6a3cf0..5f968f5 100644 --- a/ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/mq/NameplateKafkaConsumer.java +++ b/ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/mq/NameplateKafkaConsumer.java @@ -56,7 +56,7 @@ public class NameplateKafkaConsumer { } try { NameplateTableKafkaDTO tmNameplateCorpus = JSON.parseObject(recordMessages, NameplateTableKafkaDTO.class); - if (!"INSERT".equals(tmNameplateCorpus.getType()) || CollectionUtils.isEmpty(tmNameplateCorpus.getData())) { + if (!"INSERT".equals(tmNameplateCorpus.getType()) || !"UPDATE".equals(tmNameplateCorpus.getType()) || CollectionUtils.isEmpty(tmNameplateCorpus.getData())) { ack.acknowledge(); return; } diff --git a/ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/service/TmNameplateCorpusService.java b/ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/service/TmNameplateCorpusService.java index 2bd98cb..9a96c87 100644 --- a/ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/service/TmNameplateCorpusService.java +++ b/ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/service/TmNameplateCorpusService.java @@ -17,7 +17,7 @@ public interface TmNameplateCorpusService extends IService { void processItem(TmNameplateCorpus item); - void sendNameplateLto(JSONObject execDifyFlow,String aiAnalysisRequestId, TmNameplateCorpus tmNameplateCorpus); + void sendNameplateLto(JSONObject execDifyFlow, String aiAnalysisRequestId, TmNameplateCorpus tmNameplateCorpus, String businessType); ResultMsg updateNameplate(String message); diff --git a/ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/service/impl/TmNameplateCorpusServiceImpl.java b/ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/service/impl/TmNameplateCorpusServiceImpl.java index 36dd015..3901cd6 100644 --- a/ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/service/impl/TmNameplateCorpusServiceImpl.java +++ b/ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/service/impl/TmNameplateCorpusServiceImpl.java @@ -22,13 +22,16 @@ import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.context.config.annotation.RefreshScope; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; +import javax.annotation.Resource; import java.time.LocalDate; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; /** @@ -65,6 +68,20 @@ public class TmNameplateCorpusServiceImpl extends ServiceImpl tmNameplateCorpus.getCustomerFlowId()!=null && tmNameplateCorpus.getNameplateContent()!=null).orElseThrow(()->new RuntimeException("铭牌语料为空")); + Optional.ofNullable(item).filter(tmNameplateCorpus -> tmNameplateCorpus.getCustomerFlowId() != null && tmNameplateCorpus.getNameplateContent() != null).orElseThrow(() -> new RuntimeException("铭牌语料为空")); String customerFlowId = item.getCustomerFlowId(); String nameplateContent = item.getNameplateContent(); @@ -146,7 +163,8 @@ public class TmNameplateCorpusServiceImpl extends ServiceImpl { + try { + log.info("铭牌语料可用许可授权数,总结和分类场景={}", semaphore.availablePermits()); + // 获取许可 - 如果没有可用许可会阻塞等待 + semaphore.acquire(); + //第一个业务场景 开始: 总结和分类业务场景 + JSONObject execDifyFlow = diFyService.executeDifyFlow(diFyImageReq, BusinessTypeEnum.SMART_ASSISTANT_NAMEPLATE.getCode(), + JSONObject.toJSONString(corpusReportDTO), null); + log.info("runDify execDifyFlow ,铭牌语料 ,总结和分类业务,返回: {}", execDifyFlow); + }catch (Exception e){ + log.error("执行Dify失败: customerFlowId={}, 错误信息: {}", + item.getCustomerFlowId(), e.getMessage(), e); + }finally { + // 释放许可 + semaphore.release(); + } + }, executor); + long endTime = System.currentTimeMillis(); + log.info("第一个业务场景(总结和分类)执行时间: {} ms", (endTime - startTime)); + //第一个业务场景, 结束 + + long startTime2 = System.currentTimeMillis(); + try { + log.info("铭牌语料可用许可授权数,画像场景={}", semaphore.availablePermits()); + // 获取许可 - 如果没有可用许可会阻塞等待 + semaphore.acquire(); + + CompletableFuture.runAsync(() -> { + + //"analysisScene": "分析类型", // 1、企微会话 2、AI通话录音 3、AI铭牌(客流) 4、AI铭牌(试驾) + DiFyReq diFyImageReq2 = new DiFyReq(); + diFyImageReq2.setUser(ConstantStr.corpus_user); + Map inputMap2 = new HashMap<>(); + inputMap2.put("businessId", item.getCustomerFlowId()); + inputMap2.put("communicateDate", item.getNameplateEndTime().toString()); + inputMap2.put("analysisScene", "3"); + inputMap2.put("version", 2); + inputMap2.put("chat", corpusChat); + diFyImageReq2.setInputs(inputMap2); + // 创建新的DiFyReq对象以避免线程安全问题 + diFyImageReq2.setFlowId(oneTokenPortrait); + log.info("runDify execDifyFlow ,客户画像场景 ,token-{} , 对象: {}", oneTokenPortrait, diFyImageReq2); + JSONObject execDifyFlowForPortrait = diFyService.executeDifyFlow(diFyImageReq2, BusinessTypeEnum.CORPUS_PORTRAIT_NAMEPLATE.getCode(), + JSONObject.toJSONString(corpusReportDTO), null); + updateNameplate(execDifyFlowForPortrait.toJSONString()); + log.info("runDify execDifyFlow ,铭牌语料 ,客户画像场景,返回: {}", execDifyFlowForPortrait); + + }, executor); + + } catch (Exception e) { + log.error("执行Dify失败: customerFlowId={}, 错误信息: {}", + item.getCustomerFlowId(), e.getMessage(), e); + } finally { + // 释放许可 + semaphore.release(); + } + + + long endTime2 = System.currentTimeMillis(); + log.info("第二个业务场景(用户画像)执行时间: {} ms", (endTime2 - startTime2)); + + log.info("nameplate铭牌业务场景处理完总额和客户画像的总时间,启动线程 :{}", (endTime2-startTime)); + // 等待两个任务完成 + long endTime3 = System.currentTimeMillis(); + log.info("nameplate铭牌业务场景处理完总额和客户画像的总时间,执行完成:{}", (endTime3-startTime)); } catch (Exception e) { - log.error("nameplate processItem error {}", e.getMessage()); + log.error("nameplate processItem error {}", e); } } @@ -166,56 +251,94 @@ public class TmNameplateCorpusServiceImpl extends ServiceImpl updateNameplate(String message) { - if(StringUtils.isNotEmpty(message)){ - JSONObject analysisResp = JSONObject.parseObject(message); - String aiAnalysisRequestId = analysisResp.getString("aiAnalysisRequestId"); - String difyResponse = analysisResp.getString("difyResponse"); - String customerFlowId = analysisResp.getString("customerFlowId"); - Optional.ofNullable(aiAnalysisRequestLogsService.queryByAiAnalysisRequestId(aiAnalysisRequestId)) - .orElseThrow(() -> new IllegalArgumentException("AiAnalysisRequestId查询对象为空!")); + try { + if (StringUtils.isNotEmpty(message)) { + log.info("updateNameplate 回调 message: {}", message); + JSONObject analysisResp = JSONObject.parseObject(message); + String aiAnalysisRequestId = analysisResp.getString(aiAnalysisRequestIdFinal); + String difyResponse = analysisResp.getString("difyResponse"); + String customerFlowId = analysisResp.getString("customerFlowId"); + String businessType; - AiAnalysisRequestLogs aiAnalysisRequestLogs = new AiAnalysisRequestLogs(); - aiAnalysisRequestLogs.setAiAnalysisRequestId(aiAnalysisRequestId); - aiAnalysisRequestLogs.setDifyResponse(difyResponse); - - JSONObject difyJson = JSONObject.parseObject(difyResponse); - aiAnalysisRequestLogs.setBusinessResponse(difyJson.getString("outputs")); - aiAnalysisRequestLogsService.saveAiAnalysisRequestLogs(aiAnalysisRequestLogs); - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); - queryWrapper.eq(TmNameplateCorpus::getCustomerFlowId, customerFlowId); - List tmNameplateCorpusList = tmNameplateCorpusMapper.selectList(queryWrapper); - if (CollectionUtils.isNotEmpty(tmNameplateCorpusList)){ - sendNameplateLto(difyJson,aiAnalysisRequestId, tmNameplateCorpusList.get(0)); - LambdaQueryWrapper errorQueryWrapper = new LambdaQueryWrapper<>(); - errorQueryWrapper.eq(AiAnalysisErrors::getAiAnalysisRequestId, aiAnalysisRequestId); - errorQueryWrapper.eq(AiAnalysisErrors::getAiAnalysisErrorHandlingStatus, "0"); - errorQueryWrapper.eq(AiAnalysisErrors::getIsDeleted, "0"); - AiAnalysisErrors oldAiAnalysisErrors = aiAnalysisErrorsMapper.selectOne(errorQueryWrapper); - if (oldAiAnalysisErrors != null) { - aiAnalysisErrorsMapper.update(AiAnalysisErrors.builder().aiAnalysisRequestId(oldAiAnalysisErrors.getAiAnalysisRequestId()).aiAnalysisErrorHandlingStatus("1").build(), errorQueryWrapper); + AiAnalysisRequestLogs aiAnalysisRequestLogs = new AiAnalysisRequestLogs(); + JSONObject difyJson = analysisResp; + if (StringUtils.isEmpty(customerFlowId)) { // 客户画像场景 + log.info("customerFlowId 为空,客户画像场景"); + String text = analysisResp.getString(outputsFinal); + JSONObject outputs = JSONObject.parseObject(text); + customerFlowId = outputs.getString("businessId"); + aiAnalysisRequestId = analysisResp.getString(aiAnalysisRequestIdFinal); + businessType = BusinessTypeEnum.CORPUS_PORTRAIT_NAMEPLATE.getCode(); + aiAnalysisRequestLogs.setBusinessResponse(text); + } else { //总结+分类 + businessType = BusinessTypeEnum.SMART_ASSISTANT_NAMEPLATE.getCode(); + difyJson = JSONObject.parseObject(difyResponse); + aiAnalysisRequestLogs.setBusinessResponse(difyJson.getString(outputsFinal)); } + + + Optional.ofNullable(aiAnalysisRequestLogsService.queryByAiAnalysisRequestId(aiAnalysisRequestId)) + .orElseThrow(() -> new IllegalArgumentException("AiAnalysisRequestId查询对象为空!")); + aiAnalysisRequestLogs.setAiAnalysisRequestId(aiAnalysisRequestId); + aiAnalysisRequestLogs.setDifyResponse(difyResponse); + aiAnalysisRequestLogs.setAiAnalysisRequestType(businessType); + aiAnalysisRequestLogsService.saveAiAnalysisRequestLogs(aiAnalysisRequestLogs); + + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + queryWrapper.eq(TmNameplateCorpus::getCustomerFlowId, customerFlowId); + List tmNameplateCorpusList = tmNameplateCorpusMapper.selectList(queryWrapper); + if (CollectionUtils.isNotEmpty(tmNameplateCorpusList)) { //这是补偿机制 + log.info("tmNameplateCorpusList:{}", tmNameplateCorpusList.toString()); + + sendNameplateLto(difyJson, aiAnalysisRequestId, tmNameplateCorpusList.get(0), businessType); + LambdaQueryWrapper errorQueryWrapper = new LambdaQueryWrapper<>(); + errorQueryWrapper.eq(AiAnalysisErrors::getAiAnalysisRequestId, aiAnalysisRequestId); + errorQueryWrapper.eq(AiAnalysisErrors::getAiAnalysisErrorHandlingStatus, "0"); + errorQueryWrapper.eq(AiAnalysisErrors::getIsDeleted, "0"); + AiAnalysisErrors oldAiAnalysisErrors = aiAnalysisErrorsMapper.selectOne(errorQueryWrapper); + if (oldAiAnalysisErrors != null) { + aiAnalysisErrorsMapper.update(AiAnalysisErrors.builder().aiAnalysisRequestId(oldAiAnalysisErrors.getAiAnalysisRequestId()).aiAnalysisErrorHandlingStatus("1").build(), errorQueryWrapper); + } + } else { + log.info("没找到语料记录"); + } + return ResultMsg.ok(); } - return ResultMsg.ok(); + } catch (Exception e) { + log.error("updateNameplate error {}", e.getMessage()); + } + return ResultMsg.failed(); } @Override