特邀发言官 补偿机制

This commit is contained in:
lxu75
2025-06-20 15:43:14 +08:00
parent 9550cf76e0
commit 1c1a78001a
2 changed files with 56 additions and 10 deletions

View File

@@ -23,7 +23,7 @@ public class InvitedSpeakerJob {
public ResultMsg invitedSpeakerJobTask() {
try {
log.info("invitedSpeakerJobTask 开始执行特邀发言官定时任务补偿处理消息");
spokesmanService.spokesManCallBack("111");
spokesmanService.spokesManTask();
} catch (Exception e) {
log.error("invitedSpeakerJobTask 定时任务补偿处理消息异常",e.getMessage());
throw new RuntimeException(e);

View File

@@ -1,13 +1,14 @@
package com.volvo.ai.analytic.center.service;
import com.alibaba.cloud.commons.lang.StringUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.volvo.ai.analytic.center.dto.req.DiFyReq;
import com.volvo.ai.analytic.center.dto.req.DifyImageWorkFlow;
import com.volvo.ai.analytic.center.dto.req.SpokesManDTO;
import com.volvo.ai.analytic.center.dto.req.*;
import com.volvo.ai.analytic.center.entity.AiAnalysisErrors;
import com.volvo.ai.analytic.center.entity.AiAnalysisRequestLogs;
import com.volvo.ai.analytic.center.entity.AiAnalyticBusinessConfig;
@@ -66,14 +67,12 @@ public class SpokesManService {
// 生成ai分析请求id
String aiAnalysisRequestId = AiAnalysisUtils.getAiAnalysisRequestId(BusinessTypeEnum.COMMUNITYTARGET.getCode());
try {
JSONObject jsonObject = JSON.parseObject(req);
String data = jsonObject.getString("data");
// 异步保存请求日志
syncSaveRequestLogs(JSON.toJSONString(data), aiAnalysisRequestId);
syncSaveRequestLogs(JSON.toJSONString(req), aiAnalysisRequestId);
//把req转成SpokesManDTO对象
SpokesManDTO spokesManDTO = JSON.parseObject(data, SpokesManDTO.class);
SpokesManDTO spokesManDTO = JSON.parseObject(req, SpokesManDTO.class);
if (spokesManDTO == null || spokesManDTO.getFiles() == null || spokesManDTO.getFiles().isEmpty()
|| spokesManDTO.getPlatform() == null || spokesManDTO.getTheme() == null) {
|| spokesManDTO.getPlatform() == null) {
log.error("社区特邀发言官入参错误");
throw new RuntimeException("社区特邀发言官入参错误");
}
@@ -128,7 +127,54 @@ public class SpokesManService {
.build());
}
}
//特邀发言补偿
public void spokesManTask() {
try {
//捞取异常表中属于社区的异常数据
List<AiAnalysisErrors> aiAnalysisErrors = aiAnalysisErrorsMapper.selectList(new LambdaQueryWrapper<AiAnalysisErrors>()
.eq(AiAnalysisErrors::getAiAnalysisRequestType, BusinessTypeEnum.SPOKESMAN.getCode())
.eq(AiAnalysisErrors::getAiAnalysisErrorHandlingStatus, "0")
.lt(AiAnalysisErrors::getRetryCount, 4));
if (aiAnalysisErrors != null && aiAnalysisErrors.size() > 0) {
//根据ai_analysis_request_id获取AiAnalysisRequestLogs表中的对应的dify_request字段
for (AiAnalysisErrors aiAnalysisError : aiAnalysisErrors) {
try {
AiAnalysisRequestLogs aiAnalysisRequestLogs = aiAnalysisRequestLogsMapper.selectOne(new LambdaQueryWrapper<AiAnalysisRequestLogs>()
.eq(AiAnalysisRequestLogs::getAiAnalysisRequestId, aiAnalysisError.getAiAnalysisRequestId()));
if (aiAnalysisRequestLogs != null) {
//获取dify_request字段
String message = aiAnalysisRequestLogs.getBusinessRequest();
String difyRequest = aiAnalysisRequestLogs.getDifyRequest();
if (StringUtils.isNotEmpty(message) || StringUtils.isNotEmpty(difyRequest)) {
SpokesManDTO spokesManDTO = JSON.parseObject(message, SpokesManDTO.class); DiFyReq diFyReq = JSON.parseObject(difyRequest, DiFyReq.class);
//调用workflow
JSONObject difSpokesManResult = (JSONObject) diFyService.getDiFyObject(diFyReq);
difSpokesManResult.put("communityRequestId", spokesManDTO.getCommunityRequestId());
difSpokesManResult.put("aiAnalysisRequestId", aiAnalysisError.getAiAnalysisRequestId());
//处理发布时间
updatePublishedTime(difSpokesManResult);
//返回结果推送到社区的MQ
rocketMQTemplate.syncSend(topic, difSpokesManResult.toString());
//异步更新dify响应日志
syncUpdateDiFyResponse(difSpokesManResult, aiAnalysisError.getAiAnalysisRequestId());
} else {
aiAnalysisErrorsMapper.update(new AiAnalysisErrors(), new LambdaUpdateWrapper<AiAnalysisErrors>()
.eq(AiAnalysisErrors::getAiAnalysisRequestId, aiAnalysisError.getAiAnalysisRequestId())
.set(AiAnalysisErrors::getAiAnalysisErrorHandlingStatus, "2"));
}
}
} catch (Exception e) {
log.error("补偿社区消息,AIID:{},异常:{}", aiAnalysisError.getAiAnalysisRequestId(), e);
aiAnalysisErrorsMapper.update(new AiAnalysisErrors(), new LambdaUpdateWrapper<AiAnalysisErrors>()
.eq(AiAnalysisErrors::getAiAnalysisRequestId, aiAnalysisError.getAiAnalysisRequestId())
.set(AiAnalysisErrors::getRetryCount, aiAnalysisError.getRetryCount() + 1));
}
}
}
} catch (Exception e) {
log.error("处理社区异常消息异常:{}", e);
}
}
public static void updatePublishedTime(JSONObject rootJson) {
String contentAnalysisResultStr = rootJson.getString("ContentAnalysisResult");
if (contentAnalysisResultStr == null || contentAnalysisResultStr.isEmpty()) return;