企微代码重写

This commit is contained in:
ZLI263
2025-09-22 11:11:33 +08:00
parent 2217027d26
commit 154711851f
2 changed files with 53 additions and 2 deletions

View File

@@ -38,6 +38,7 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
@@ -76,6 +77,9 @@ public class TmOdsVdqwMessagearchivingServiceImpl extends ServiceImpl<TmOdsVdqwM
private String topic;
@Value("${dify.corpus.qiweiToken}")
private String qiweiToken;
@Value("${dify.corpus.portrait.oneToken}")
private String oneTokenPortrait;
@Value("${batch.size}")
public int pageSize = 100;
@@ -88,8 +92,13 @@ public class TmOdsVdqwMessagearchivingServiceImpl extends ServiceImpl<TmOdsVdqwM
@Autowired
private TmTelephoneCorpusService tmTelephoneCorpusService;
@Value("${batch.threadNum}")
int threadNum =2;
private Semaphore semaphore = new Semaphore(threadNum); // 限制并发数
String consultantIdStr ="consultantId";
@Override
public void runQiWeiCorpusDify(String paramJson) {
@@ -120,15 +129,16 @@ public class TmOdsVdqwMessagearchivingServiceImpl extends ServiceImpl<TmOdsVdqwM
// 获取消息列表
int optimalThreadPoolSize = Runtime.getRuntime().availableProcessors() + 1;
log.info("企微处理总数据量:{},总页数:{},获取的线程数:{}",totalPages,totalPages,optimalThreadPoolSize);
log.info("QiWeiCorpus企微处理总数据量:{},设置线程数:{},获取的线程数:{}",totalPages,threadNum,optimalThreadPoolSize);
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(optimalThreadPoolSize); // 根据需求调整线程池大小
ExecutorService executor = Executors.newFixedThreadPool(threadNum); // 根据需求调整线程池大小
try {
for (int i = 1; i <= totalPages; i++) {
int offset = (i - 1) * pageSize;
List<OdsVdqwMessageOTD> messageList = tmOdsVdqwMessagearchivingMapper.queryOdsVdqwMessageByData(statTime, endTime, offset, pageSize, retry);
// 处理查询到的数据
log.info("待处理企微聊天列表messageList,{}",messageList.toString());
// 使用 CompletableFuture 并行处理
CompletableFuture<?>[] futures = messageList.stream()
.map(item -> CompletableFuture.runAsync(() -> {
@@ -231,6 +241,46 @@ public class TmOdsVdqwMessagearchivingServiceImpl extends ServiceImpl<TmOdsVdqwM
log.info(" 企业语料处理保存报告异常processItem{} ", e);
}
}
executePortraitTask(unionId, maxMsgTimeItem, chatList, corpusReportDTO);
}
}
/**
* 执行客户画像任务
*/
private void executePortraitTask(String unionId, OdsVdqwMessageOTD maxMsgTimeItem, StringBuffer chatList, CorpusReportDTO corpusReportDTO) {
try {
//"analysisScene": "分析类型", // 1、企微会话 2、AI通话录音 3、AI铭牌(客流) 4、AI铭牌(试驾)
DiFyReq diFyImageReq2 = new DiFyReq();
diFyImageReq2.setUser(ConstantStr.corpus_user);
Map<String, Object> inputMap2 = new HashMap<>();
inputMap2.put("businessId", unionId);
inputMap2.put(consultantIdStr, corpusReportDTO.getUserId());//顾问id
inputMap2.put("communicateDate", DateUtil.format(maxMsgTimeItem.getMsgTime(), DatePattern.NORM_DATETIME_PATTERN));
inputMap2.put("analysisScene", "1");
inputMap2.put("version", 2);
inputMap2.put("chat", chatList.toString());
diFyImageReq2.setInputs(inputMap2);
// 创建新的DiFyReq对象以避免线程安全问题
diFyImageReq2.setFlowId(oneTokenPortrait);
log.info("runDify execDifyFlow ,客户画像场景 token-{} , 对象: {}", oneTokenPortrait, diFyImageReq2);
JSONObject execDifyFlowForPortrait = diFyService.executeDifyFlow(diFyImageReq2, BusinessTypeEnum.CORPUS_PORTRAIT_NAMEPLATE.getCode(),
JSONObject.toJSONString(corpusReportDTO), null);
log.info("runDify execDifyFlow ,客户画像场景 ,返回-{} ", execDifyFlowForPortrait);
JSONObject text = execDifyFlowForPortrait.getJSONObject("outputs");
// 发送MQ
log.info("send mq企微客户画像 {}", text);
tmTelephoneCorpusService.sendMq( CategoryEnum.ENTERPRISE_WECHAT_PORTRAIT.getCode(), text.toJSONString());
aiAnalysisRequestLogsService.saveAiAnalysisRequestLogs(AiAnalysisRequestLogs.builder().aiAnalysisRequestId(execDifyFlowForPortrait.getString("aiAnalysisRequestId")).businessResponse(text.toJSONString()).build());
log.info("runDify execDifyFlow ,企微语料 ,客户画像场景,返回: {}", execDifyFlowForPortrait);
} catch (Exception e) {
log.error("执行客户画像任务异常", e);
}
}