优化结构

This commit is contained in:
zhangfan
2024-12-23 14:42:55 +08:00
parent a362e4e610
commit 597d7b6f3b
3 changed files with 167 additions and 174 deletions

View File

@@ -22,7 +22,7 @@ public class ClickHouseConfig {
}
@Bean(name = "clickHouseDataSource")
@ConfigurationProperties(prefix="spring.clickhouse")
@ConfigurationProperties(prefix="spring.clickhouse-ads-ai")
public DataSource clickHouseDataSource(){
DataSource dataSource = DataSourceBuilder.create().build();
return dataSource;

View File

@@ -58,8 +58,8 @@ public class MqMessageRecordServiceImpl extends ServiceImpl<MqMessageRecordMappe
/**
* 处理Mq消息
* @param message
* @return
* @param message 消息具体内容
* @return 返回处理情况
*/
@Override
public boolean processMessageByMQ(String message) {
@@ -83,90 +83,85 @@ public class MqMessageRecordServiceImpl extends ServiceImpl<MqMessageRecordMappe
log.error("数据入库失败(MQMessageRecord)", ex);
return false;
}
if (SinceTypeEnum.SINCETYPE2.getCode().equals(oldItem.getSinceType()) && SubSinceTypeEnum.SINCETYPE51.getCode().equals(oldItem.getSubSinceType()) ) {
log.info("辨别数据为分析请求");
DiffDefeatResult diffDefeatResult = null;
DiffDefeatanAlysis diffDefeatanAlysis = JSON.parseObject(JSON.toJSONString(oldItem.getData()), DiffDefeatanAlysis.class);
DiffDefeatAnalyseOutputResult response = this.processChatRecord(diffDefeatanAlysis);
DiffDefeatAnalyseOutput output = response.getDiffDefeatAnalyseOutput();
log.info("数据分析完成");
if(HandleStatusEnum.ANALYSIS_NORMAL.getCode().equals(output.getHandleStatus())){
curMQMessageRecord.setTaskStatus(MqTaskStatusEnum.ANALYSIS_COMPLETION.getCode());
diffDefeatResult = JSONObject.parseObject(output.getResultStr(), DiffDefeatResult.class);
} else if (HandleStatusEnum.ANALYSIS_CONTENT_EMPTY.getCode().equals(output.getHandleStatus())) {
curMQMessageRecord.setTaskStatus(MqTaskStatusEnum.ANALYSIS_COMPLETION.getCode());
diffDefeatResult = new DiffDefeatResult();
diffDefeatResult.setUserStatus("无效通话");
diffDefeatResult.setAppointmentResult("因为通话时间太短没有ASR转义文本因此判断为无效通话");
} else {
curMQMessageRecord.setTaskStatus(MqTaskStatusEnum.ANALYSIS_FAILURE.getCode());
}
curMQMessageRecord.setRespCode(output.getHandleStatus() == null ? "" : output.getHandleStatus().toString());
curMQMessageRecord.setRespContent(output.getResultStr());
this.updateById(curMQMessageRecord);
log.info("数据更改分析状态完成");
if (MqTaskStatusEnum.ANALYSIS_COMPLETION.getCode().equals(curMQMessageRecord.getTaskStatus())){
log.info("数据分析结果正常");
DiffdefeatApprove curDiffdefeatApprove = new DiffdefeatApprove();
String userStatus = getUserStatus(diffDefeatResult.getUserStatus());
curDiffdefeatApprove.setFormId(oldItem.getFormId());
curDiffdefeatApprove.setBusinessId(diffDefeatanAlysis.getBusinessId());
curDiffdefeatApprove.setUserStatus(userStatus);
curDiffdefeatApprove.setAppointmentResult(diffDefeatResult.getAppointmentResult());
curDiffdefeatApprove.setInputStr(response.getContentStr());
curDiffdefeatApprove.setDefeatTime(diffDefeatanAlysis.getDefeatTime());
diffdefeatApproveService.save(curDiffdefeatApprove);
log.info("插入数据记录完成");
DiffDefeatanCallbak curDiffDefeatanCallbak = new DiffDefeatanCallbak();
curDiffDefeatanCallbak.setBusinessId(diffDefeatanAlysis.getBusinessId());
curDiffDefeatanCallbak.setResponseCode(HandleStatusEnum.ANALYSIS_NORMAL.getCode().toString());
curDiffDefeatanCallbak.setLabel(userStatus);
curDiffDefeatanCallbak.setDescribe(diffDefeatResult.getAppointmentResult());
RabbitMqToData rabbitMqToData = new RabbitMqToData();
rabbitMqToData.setFormId(oldItem.getFormId());
rabbitMqToData.setSinceType(SinceTypeEnum.SINCETYPE2.getCode());
rabbitMqToData.setSubSinceType(SubSinceTypeEnum.SINCETYPE53.getCode());
rabbitMqToData.setData(curDiffDefeatanCallbak);
String callbakInput = JSONObject.toJSONString(rabbitMqToData);
rabbitTemplate.convertAndSend(Constant.rabbitToFormQueue, message);
log.info("发送回调MQ完成: {}", callbakInput);
} else {
log.info("数据分析结果不正常,等待重试");
return false;
}
}
if (SinceTypeEnum.SINCETYPE2.getCode().equals(oldItem.getSinceType()) && SubSinceTypeEnum.SINCETYPE52.getCode().equals(oldItem.getSubSinceType())) {
log.info("辨别数据为审批结果");
com.volvo.ai.analytic.center.dto.req.DiffDefeatanApprove curDiffDefeatanApprove = JSON.parseObject(JSON.toJSONString(oldItem.getData()), com.volvo.ai.analytic.center.dto.req.DiffDefeatanApprove.class);
DiffdefeatApprove approveEntity = diffdefeatApproveService.lambdaQuery().eq(DiffdefeatApprove::getFormId, oldItem.getFormId()).last("limit 1").one();
if (approveEntity != null) {
try {
approveEntity.setApproveCode(curDiffDefeatanApprove.getApproveCode());
approveEntity.setApproveResult(curDiffDefeatanApprove.getApproveResult());
approveEntity.setApproveOpinion(curDiffDefeatanApprove.getApproveOpinion());
diffdefeatApproveService.updateById(approveEntity);
curMQMessageRecord.setTaskStatus(MqTaskStatusEnum.ANALYSIS_COMPLETION.getCode());
curMQMessageRecord.setRespCode(HandleStatusEnum.ANALYSIS_NORMAL.getCode().toString());
this.updateById(curMQMessageRecord);
log.info("审批结果更新完成");
} catch (Exception e) {
log.error("审批结果更新失败",e.getMessage());
curMQMessageRecord.setTaskStatus(MqTaskStatusEnum.ANALYSIS_FAILURE.getCode());
curMQMessageRecord.setRespContent(e.getMessage());
curMQMessageRecord.setRespCode(HandleStatusEnum.ANALYSIS_CALLING.getCode().toString());
this.updateById(curMQMessageRecord);
}
}
if (Objects.equals(SinceTypeEnum.SINCETYPE2.getCode(),oldItem.getSinceType()) && Objects.equals(SubSinceTypeEnum.SINCETYPE51.getCode(),oldItem.getSubSinceType())) {
this.processMqSinceType51(oldItem, curMQMessageRecord);
} else if (Objects.equals(SinceTypeEnum.SINCETYPE2.getCode(),oldItem.getSinceType()) && Objects.equals(SubSinceTypeEnum.SINCETYPE52.getCode(),oldItem.getSubSinceType())) {
this.processMqSinceType52(oldItem, curMQMessageRecord);
} else {
log.info("辨别数据为未知类型sinceType {},subSinceType {}", oldItem.getSinceType(), oldItem.getSubSinceType());
}
return true;
}
private void processMqSinceType51(RabbitMqFormData oldItem,MqMessageRecord curMQMessageRecord){
log.info("辨别数据为分析请求 {}" ,SubSinceTypeEnum.SINCETYPE51.getCode());
DiffDefeatanAlysis diffDefeatanAlysis = JSON.parseObject(JSON.toJSONString(oldItem.getData()), DiffDefeatanAlysis.class);
DiffDefeatAnalyseOutputResult response = this.processChatRecord(diffDefeatanAlysis);
DiffDefeatAnalyseOutput output = response.getDiffDefeatAnalyseOutput();
log.info("数据分析完成");
DiffDefeatResult diffDefeatResult;
if(Objects.equals(HandleStatusEnum.ANALYSIS_NORMAL.getCode(),output.getHandleStatus())){
diffDefeatResult = JSONObject.parseObject(output.getResultStr(), DiffDefeatResult.class);
curMQMessageRecord.setTaskStatus(MqTaskStatusEnum.ANALYSIS_COMPLETION.getCode());
} else if (Objects.equals(HandleStatusEnum.ANALYSIS_CONTENT_EMPTY.getCode(),output.getHandleStatus())) {
diffDefeatResult = new DiffDefeatResult();
diffDefeatResult.setUserStatus("无效通话");
diffDefeatResult.setAppointmentResult("因为通话时间太短没有ASR转义文本因此判断为无效通话");
curMQMessageRecord.setTaskStatus(MqTaskStatusEnum.ANALYSIS_COMPLETION.getCode());
} else {
diffDefeatResult = new DiffDefeatResult();
curMQMessageRecord.setTaskStatus(MqTaskStatusEnum.ANALYSIS_FAILURE.getCode());
}
curMQMessageRecord.setRespCode(output.getHandleStatus() == null ? "" : output.getHandleStatus().toString());
curMQMessageRecord.setRespContent(output.getResultStr());
this.updateById(curMQMessageRecord);
log.info("数据更改分析状态完成");
if (Objects.equals(MqTaskStatusEnum.ANALYSIS_COMPLETION.getCode(),curMQMessageRecord.getTaskStatus())){
log.info("数据分析结果正常");
DiffdefeatApprove diffdefeatApprove = new DiffdefeatApprove();
String userStatus = getUserStatus(diffDefeatResult.getUserStatus());
diffdefeatApprove.setFormId(oldItem.getFormId());
diffdefeatApprove.setBusinessId(diffDefeatanAlysis.getBusinessId());
diffdefeatApprove.setUserStatus(userStatus);
diffdefeatApprove.setAppointmentResult(diffDefeatResult.getAppointmentResult());
diffdefeatApprove.setInputStr(response.getContentStr());
diffdefeatApprove.setDefeatTime(diffDefeatanAlysis.getDefeatTime());
diffdefeatApproveService.save(diffdefeatApprove);
log.info("插入数据记录完成开始发送Mq消息");
this.toMqSinceType53(diffDefeatanAlysis,diffDefeatResult,oldItem);
} else {
log.info("数据分析结果不正常,等待重试");
}
}
private void processMqSinceType52(RabbitMqFormData oldItem,MqMessageRecord curMQMessageRecord){
log.info("辨别数据为分析请求 {}" ,SubSinceTypeEnum.SINCETYPE52.getCode());
com.volvo.ai.analytic.center.dto.req.DiffDefeatanApprove curDiffDefeatApprove = JSON.parseObject(JSON.toJSONString(oldItem.getData()), com.volvo.ai.analytic.center.dto.req.DiffDefeatanApprove.class);
DiffdefeatApprove approveEntity = diffdefeatApproveService.lambdaQuery().eq(DiffdefeatApprove::getFormId, oldItem.getFormId()).last("limit 1").one();
if (approveEntity != null) {
try {
approveEntity.setApproveCode(curDiffDefeatApprove.getApproveCode());
approveEntity.setApproveResult(curDiffDefeatApprove.getApproveResult());
approveEntity.setApproveOpinion(curDiffDefeatApprove.getApproveOpinion());
diffdefeatApproveService.updateById(approveEntity);
curMQMessageRecord.setTaskStatus(MqTaskStatusEnum.ANALYSIS_COMPLETION.getCode());
curMQMessageRecord.setRespCode(HandleStatusEnum.ANALYSIS_NORMAL.getCode().toString());
this.updateById(curMQMessageRecord);
log.info("审批结果更新完成");
} catch (Exception e) {
log.error("审批结果更新失败",e);
curMQMessageRecord.setTaskStatus(MqTaskStatusEnum.ANALYSIS_FAILURE.getCode());
curMQMessageRecord.setRespContent(e.getMessage());
curMQMessageRecord.setRespCode(HandleStatusEnum.ANALYSIS_CALLING.getCode().toString());
this.updateById(curMQMessageRecord);
}
}
}
/**
* 处理聊天和语音数据
* @return
*/
@Override
public DiffDefeatAnalyseOutputResult processChatRecord(DiffDefeatanAlysis input) {
@@ -175,7 +170,7 @@ public class MqMessageRecordServiceImpl extends ServiceImpl<MqMessageRecordMappe
//获取脱敏配置信息
List<DataMaskingRule> maskingRuleItems = dataMaskingRuleService.getDataMaskingRuleListByApplicationChannel(Constant.CHANNEL_DCC);
List<DiffDefeatCorpuItem> curDiffDefeatCorpuItems = new ArrayList<>();
List<DiffDefeatCorpuItem> curDiffDefeatCorpItems = new ArrayList<>();
// 根据SourceId查询通话信息
List<String> sourceIds = Optional.ofNullable(input.getCallList()).map(list -> list.stream().map(CallItem::getSourceId).collect(Collectors.toList())).orElse(Collections.emptyList());
if (!CollectionUtils.isEmpty(sourceIds)) {
@@ -183,28 +178,28 @@ public class MqMessageRecordServiceImpl extends ServiceImpl<MqMessageRecordMappe
String sourceId = sourceIds.stream().map(code -> "'"+code+"'").collect(Collectors.joining(","));
int asrCount = clickhouseJdbcTemplate.queryForObject("select count(1) from asr_speechdetail where source_id in ("+ sourceId+") and file_status='InProgress'",Integer.class);
if (asrCount > 0) {
log.info("存在未完成解析的通话信息");
output.setHandleStatus(HandleStatusEnum.ANALYSIS_CALLING.getCode());
output.setResultStr(HandleStatusEnum.ANALYSIS_CALLING.getMessage());
return new DiffDefeatAnalyseOutputResult(output, contentStr.toString());
}
//查询到的通话数据
List<Map<String, Object>> hishistoryList = clickhouseJdbcTemplate.queryForList("select id,msg_json,source_id from asr_hishistory where dialect_text=0 and source_id in ("+ sourceId+") order by id");
List<Map<String, Object>> historyList = clickhouseJdbcTemplate.queryForList("select id,msg_json,source_id from asr_hishistory where dialect_text=0 and source_id in ("+ sourceId+") order by id");
for (CallItem item : input.getCallList()) {
Optional<String> result = CollectionUtils.isEmpty(hishistoryList) ? Optional.empty():
hishistoryList.stream()
.filter(entity -> entity.get("source_id").equals(item.getSourceId()))
Optional<String> result = CollectionUtils.isEmpty(historyList) ? Optional.empty():
historyList.stream().filter(entity -> entity.get("source_id").equals(item.getSourceId()))
.findFirst().map(entity -> entity.get("msg_json") == null ? "" : entity.get("msg_json").toString());
DiffDefeatCorpuItem curDiffDefeatCorpuItem = new DiffDefeatCorpuItem();
curDiffDefeatCorpuItem.setSourceId(item.getSourceId());
curDiffDefeatCorpuItem.setCategory(CategoryEnum.PHONE_VOICE.getCode());
curDiffDefeatCorpuItem.setCorpuText(result.isPresent() ? result.get() : "");
curDiffDefeatCorpuItem.setHappenTime(item.getAudioTime());
curDiffDefeatCorpuItems.add(curDiffDefeatCorpuItem);
DiffDefeatCorpuItem curDiffDefeatCorpItem = new DiffDefeatCorpuItem();
curDiffDefeatCorpItem.setSourceId(item.getSourceId());
curDiffDefeatCorpItem.setCategory(CategoryEnum.PHONE_VOICE.getCode());
curDiffDefeatCorpItem.setCorpuText(result.orElse(""));
curDiffDefeatCorpItem.setHappenTime(item.getAudioTime());
curDiffDefeatCorpItems.add(curDiffDefeatCorpItem);
}
}
// 查询企微的数据
if (StringUtils.isBlank(input.getVdqwUserId())|| StringUtils.isBlank(input.getVdqwCustomerId())) {
if (StringUtils.isBlank(input.getVdqwUserId()) || StringUtils.isBlank(input.getVdqwCustomerId())) {
log.info("企微用户信息为空,不附加企微数据");
}else {
try {
@@ -213,7 +208,8 @@ public class MqMessageRecordServiceImpl extends ServiceImpl<MqMessageRecordMappe
List<Map<String, Object>> userList = clickhouseJdbcTemplate.queryForList(userIdSql);
if (!CollectionUtils.isEmpty(userList) && userList.size() == 2) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String startTime = sdf.format(DateUtil.offsetDay(input.getDefeatTime(), -3));
int offsetDay = 3;
String startTime = sdf.format(DateUtil.offsetDay(input.getDefeatTime(), -offsetDay));
String endTime = sdf.format(input.getDefeatTime() == null ? new Date() : input.getDefeatTime());
String user1 = userList.get(0).get("userid").toString();
String user2 = userList.get(1).get("userid").toString();
@@ -232,40 +228,39 @@ public class MqMessageRecordServiceImpl extends ServiceImpl<MqMessageRecordMappe
"order by seq;";
List<SessionItem> userInfoList = clickhouseJdbcTemplate.queryForList(sql, SessionItem.class);
Date sliceSatrtTime = sdf.parse(startTime);
Date slice1StopTime = DateUtil.offsetDay(sliceSatrtTime,1);
Date slice2StopTime = DateUtil.offsetDay(slice1StopTime,1);
Date slice3StopTime = sdf.parse(endTime);
this.addSliceData(curDiffDefeatCorpuItems, sliceSatrtTime, slice1StopTime, userInfoList);
this.addSliceData(curDiffDefeatCorpuItems, slice1StopTime, slice2StopTime, userInfoList);
this.addSliceData(curDiffDefeatCorpuItems, slice2StopTime, slice3StopTime, userInfoList);
for (int i=0;i<offsetDay;i++){
Date sliceStartTime = sdf.parse(startTime);
sliceStartTime = DateUtil.offsetDay(sliceStartTime,i);
Date sliceStopTime = DateUtil.offsetDay(sliceStartTime,i+1);
this.addSliceData(curDiffDefeatCorpItems, sliceStartTime, sliceStopTime, userInfoList);
}
}
} catch (ParseException e) {
log.error("企微数据处理异常",e);
}
}
// 对集合进行排序
curDiffDefeatCorpuItems.sort(Comparator.comparing(DiffDefeatCorpuItem::getHappenTime));
if (!CollectionUtils.isEmpty(curDiffDefeatCorpuItems)){
for (DiffDefeatCorpuItem item : curDiffDefeatCorpuItems) {
if (CategoryEnum.ENTERPRISE_WECHAT.getCode().equals(item.getCategory())) {
curDiffDefeatCorpItems.sort(Comparator.comparing(DiffDefeatCorpuItem::getHappenTime));
if (!CollectionUtils.isEmpty(curDiffDefeatCorpItems)){
for (DiffDefeatCorpuItem item : curDiffDefeatCorpItems) {
if (Objects.equals(CategoryEnum.ENTERPRISE_WECHAT.getCode(),item.getCategory())) {
contentStr.append(CategoryEnum.ENTERPRISE_WECHAT.getMessage()).append("\n");
contentStr.append(item.getCorpuText());
} else if (CategoryEnum.PHONE_VOICE.getCode().equals(item.getCategory()) && StringUtils.isNotEmpty(item.getCorpuText())) {
} else if (Objects.equals(CategoryEnum.PHONE_VOICE.getCode(),item.getCategory()) && StringUtils.isNotEmpty(item.getCorpuText())) {
// 反序列化CorpuText为KafkaJson对象
KafkaJson oldItem = JSON.parseObject(item.getCorpuText(), KafkaJson.class);
// 反序列化display为CollectTranscriberJobResponse对象
CollectTranscriberJobResponse resp = JSON.parseObject(oldItem.getDisplay(), CollectTranscriberJobResponse.class);
// 检查状态并处理Segments
if ("FINISHED".equals(resp.getStatus()) && !CollectionUtils.isEmpty(resp.getSegments())) {
StringBuffer vocStr = new StringBuffer();
if (Objects.equals("FINISHED",resp.getStatus()) && !CollectionUtils.isEmpty(resp.getSegments())) {
StringBuilder vocStr = new StringBuilder();
for (Segment segment : resp.getSegments()) {
if (RoleEnum.AGENT.getCode().equals(segment.getResult().getAnalysisInfo().getRole())) {
vocStr.append(RoleEnum.AGENT.getMessage()+"" + segment.getResult().getText() + "\n");
} else if (RoleEnum.USER.getCode().equals(segment.getResult().getAnalysisInfo().getRole())) {
vocStr.append(RoleEnum.USER.getMessage()+"" + segment.getResult().getText() + "\n");
if (Objects.equals(RoleEnum.AGENT.getCode(),segment.getResult().getAnalysisInfo().getRole())) {
vocStr.append(RoleEnum.AGENT.getMessage()).append("").append(segment.getResult().getText()).append("\n");
} else if (Objects.equals(RoleEnum.USER.getCode(),segment.getResult().getAnalysisInfo().getRole())) {
vocStr.append(RoleEnum.USER.getMessage()).append("").append(segment.getResult().getText()).append("\n");
} else {
vocStr.append(RoleEnum.OTHER.getMessage()+"" + segment.getResult().getText() + "\n");
vocStr.append(RoleEnum.OTHER.getMessage()).append("").append(segment.getResult().getText()).append("\n");
}
}
contentStr.append(vocStr);
@@ -293,9 +288,9 @@ public class MqMessageRecordServiceImpl extends ServiceImpl<MqMessageRecordMappe
diFyReq.setUser(user);
diFyReq.setFlowId(flowId);
diFyReq.setInputs(record);
JSONObject difyResult = (JSONObject) diFyService.getDiFyObject(diFyReq);
JSONObject difResult = (JSONObject) diFyService.getDiFyObject(diFyReq);
output.setHandleStatus(HandleStatusEnum.ANALYSIS_NORMAL.getCode());
output.setResultStr(difyResult.getString("result"));
output.setResultStr(difResult.getString("result"));
log.info("DiFy平台处理结果:{}", output.getResultStr());
return new DiffDefeatAnalyseOutputResult(output, contentStr.toString());
}
@@ -319,26 +314,26 @@ public class MqMessageRecordServiceImpl extends ServiceImpl<MqMessageRecordMappe
for (MqMessageRecord curMQMessageRecord : mqMessageRecords){
log.info("辨别数据为分析请求");
RabbitMqFormData oldItem = JSONObject.parseObject(curMQMessageRecord.getMessageContent(), RabbitMqFormData.class);
if (curMQMessageRecord.getSinceType() == 2 && curMQMessageRecord.getSubSinceType() == SubSinceTypeEnum.SINCETYPE51.getCode() ) {
try {
DiffDefeatResult diffDefeatResult = null;
try {
if (Objects.equals(SinceTypeEnum.SINCETYPE2.getCode(),curMQMessageRecord.getSinceType()) && Objects.equals(SubSinceTypeEnum.SINCETYPE51.getCode(),curMQMessageRecord.getSubSinceType())) {
DiffDefeatResult diffDefeatResult;
DiffDefeatanAlysis diffDefeatanAlysis = JSON.parseObject(JSON.toJSONString(oldItem.getData()), DiffDefeatanAlysis.class);
DiffDefeatAnalyseOutputResult response = this.processChatRecord(diffDefeatanAlysis);
DiffDefeatAnalyseOutput output = response.getDiffDefeatAnalyseOutput();
log.info("数据分析完成");
if(HandleStatusEnum.ANALYSIS_NORMAL.getCode().equals(output.getHandleStatus())){
if(Objects.equals(HandleStatusEnum.ANALYSIS_NORMAL.getCode(),output.getHandleStatus())){
curMQMessageRecord.setTaskStatus(MqTaskStatusEnum.ANALYSIS_COMPLETION.getCode());
diffDefeatResult = JSONObject.parseObject(output.getResultStr(), DiffDefeatResult.class);
} else if (HandleStatusEnum.ANALYSIS_CONTENT_EMPTY.getCode().equals(output.getHandleStatus())) {
} else if (Objects.equals(HandleStatusEnum.ANALYSIS_CONTENT_EMPTY.getCode(),output.getHandleStatus())) {
curMQMessageRecord.setTaskStatus(MqTaskStatusEnum.ANALYSIS_COMPLETION.getCode());
diffDefeatResult = new DiffDefeatResult();
diffDefeatResult.setUserStatus("无效通话");
diffDefeatResult.setAppointmentResult("因为通话时间太短没有ASR转义文本因此判断为无效通话");
} else {
diffDefeatResult = new DiffDefeatResult();
curMQMessageRecord.setTaskStatus(MqTaskStatusEnum.ANALYSIS_FAILURE.getCode());
}
if (curMQMessageRecord.getTaskStatus() == MqTaskStatusEnum.ANALYSIS_COMPLETION.getCode()){
String userStatus = getUserStatus(diffDefeatResult.getUserStatus());
if (Objects.equals(MqTaskStatusEnum.ANALYSIS_COMPLETION.getCode(),curMQMessageRecord.getTaskStatus())){
DiffdefeatApprove curDiffdefeatApprove = diffdefeatApproveService.lambdaQuery()
.eq(DiffdefeatApprove::getFormId, oldItem.getFormId())
.eq(DiffdefeatApprove::getBusinessId, curMQMessageRecord.getSubBizNo())
@@ -346,34 +341,22 @@ public class MqMessageRecordServiceImpl extends ServiceImpl<MqMessageRecordMappe
.last("limit 1").one();
if(curDiffdefeatApprove!=null){
curDiffdefeatApprove.setAppointmentResult(diffDefeatResult.getAppointmentResult());
curDiffdefeatApprove.setUserStatus(userStatus);
curDiffdefeatApprove.setUserStatus(getUserStatus(diffDefeatResult.getUserStatus()));
curDiffdefeatApprove.setInputStr(response.getContentStr());
diffdefeatApproveService.updateById(curDiffdefeatApprove);
} else {
log.info("数据分析结果正常");
curDiffdefeatApprove = new DiffdefeatApprove();
curDiffdefeatApprove.setFormId(oldItem.getFormId());
curDiffdefeatApprove.setBusinessId(diffDefeatanAlysis.getBusinessId());
curDiffdefeatApprove.setUserStatus(userStatus);
curDiffdefeatApprove.setUserStatus(getUserStatus(diffDefeatResult.getUserStatus()));
curDiffdefeatApprove.setAppointmentResult(diffDefeatResult.getAppointmentResult());
curDiffdefeatApprove.setInputStr(response.getContentStr());
curDiffdefeatApprove.setDefeatTime(diffDefeatanAlysis.getDefeatTime());
diffdefeatApproveService.save(curDiffdefeatApprove);
log.info("插入数据记录完成");
}
DiffDefeatanCallbak curDiffDefeatanCallbak = new DiffDefeatanCallbak();
curDiffDefeatanCallbak.setBusinessId(diffDefeatanAlysis.getBusinessId());
curDiffDefeatanCallbak.setResponseCode(HandleStatusEnum.ANALYSIS_NORMAL.getCode().toString());
curDiffDefeatanCallbak.setLabel(userStatus);
curDiffDefeatanCallbak.setDescribe(diffDefeatResult.getAppointmentResult());
RabbitMqToData rabbitMqToData = new RabbitMqToData();
rabbitMqToData.setFormId(oldItem.getFormId());
rabbitMqToData.setSinceType(2);
rabbitMqToData.setSubSinceType(SubSinceTypeEnum.SINCETYPE53.getCode());
rabbitMqToData.setData(curDiffDefeatanCallbak);
String callbakInput = JSONObject.toJSONString(rabbitMqToData);
rabbitTemplate.convertAndSend(Constant.rabbitToFormQueue, callbakInput);
log.info("发送回调MQ完成: {}", callbakInput);
this.toMqSinceType53(diffDefeatanAlysis,diffDefeatResult,oldItem);
}
curMQMessageRecord.setRespCode(output.getHandleStatus()+"");
curMQMessageRecord.setRespContent(output.getResultStr());
@@ -381,19 +364,9 @@ public class MqMessageRecordServiceImpl extends ServiceImpl<MqMessageRecordMappe
curMQMessageRecord.setRetryCount(curMQMessageRecord.getRetryCount()+1);
this.updateById(curMQMessageRecord);
log.info("数据更改分析状态完成");
} catch (Exception e) {
log.error("数据解析异常: {}", e.getMessage());
curMQMessageRecord.setTaskStatus(MqTaskStatusEnum.ANALYSIS_FAILURE.getCode());
curMQMessageRecord.setRespCode(HandleStatusEnum.ANALYSIS_CALLING.getCode().toString());
curMQMessageRecord.setRespContent(e.getMessage());
curMQMessageRecord.setLastRetryTime(currTime);
curMQMessageRecord.setRetryCount(curMQMessageRecord.getRetryCount()+1);
this.updateById(curMQMessageRecord);
}
}
if (curMQMessageRecord.getSinceType() == 2 && curMQMessageRecord.getSubSinceType() == SubSinceTypeEnum.SINCETYPE52.getCode()) {
try {
if (Objects.equals(curMQMessageRecord.getSinceType(),SinceTypeEnum.SINCETYPE2.getCode()) && Objects.equals(curMQMessageRecord.getSubSinceType(), SubSinceTypeEnum.SINCETYPE52.getCode())) {
DiffdefeatApprove approveEntity = diffdefeatApproveService.lambdaQuery()
.eq(DiffdefeatApprove::getFormId, oldItem.getFormId())
.eq(DiffdefeatApprove::getBusinessId, curMQMessageRecord.getSubBizNo())
@@ -413,29 +386,26 @@ public class MqMessageRecordServiceImpl extends ServiceImpl<MqMessageRecordMappe
curMQMessageRecord.setRetryCount(curMQMessageRecord.getRetryCount()+1);
this.updateById(curMQMessageRecord);
}
} catch (Exception e) {
log.error("数据解析异常: {}", e.getMessage());
curMQMessageRecord.setTaskStatus(MqTaskStatusEnum.ANALYSIS_FAILURE.getCode());
curMQMessageRecord.setRespCode(HandleStatusEnum.ANALYSIS_CALLING.getCode().toString());
curMQMessageRecord.setRespContent(e.getMessage());
curMQMessageRecord.setLastRetryTime(currTime);
curMQMessageRecord.setRetryCount(curMQMessageRecord.getRetryCount()+1);
this.updateById(curMQMessageRecord);
}
} catch (Exception e) {
log.error("数据处理异常", e);
curMQMessageRecord.setTaskStatus(MqTaskStatusEnum.ANALYSIS_FAILURE.getCode());
curMQMessageRecord.setRespCode(HandleStatusEnum.ANALYSIS_CALLING.getCode().toString());
curMQMessageRecord.setRespContent(e.getMessage());
curMQMessageRecord.setLastRetryTime(currTime);
curMQMessageRecord.setRetryCount(curMQMessageRecord.getRetryCount()+1);
this.updateById(curMQMessageRecord);
}
}
}
private String getUserStatus(String oldStr)
{
if (MessageConvertEnum.CONFIRMED.getCode().equals(oldStr)){
private String getUserStatus(String oldStr){
if (Objects.equals(MessageConvertEnum.CONFIRMED.getCode(),oldStr)){
return MessageConvertEnum.CONFIRMED.getMessage();
}else if (MessageConvertEnum.LOOK_ON.getCode().equals(oldStr)){
}else if (Objects.equals(MessageConvertEnum.LOOK_ON.getCode(),oldStr)){
return MessageConvertEnum.LOOK_ON.getMessage();
}else if (MessageConvertEnum.GIVE_UP.getCode().equals(oldStr)){
}else if (Objects.equals(MessageConvertEnum.GIVE_UP.getCode(),oldStr)){
return MessageConvertEnum.GIVE_UP.getMessage();
}else if (MessageConvertEnum.INVALID.getCode().equals(oldStr)){
}else if (Objects.equals(MessageConvertEnum.INVALID.getCode(),oldStr)){
return MessageConvertEnum.INVALID.getMessage();
}else {
return oldStr;
@@ -443,7 +413,6 @@ public class MqMessageRecordServiceImpl extends ServiceImpl<MqMessageRecordMappe
}
public void addSliceData(List<DiffDefeatCorpuItem> curDiffDefeatCorpuItems, Date sliceStartTime, Date sliceStopTime, List<SessionItem> sessionItems) {
// 使用Java 8的Stream API来过滤sessionItems
List<SessionItem> slices = sessionItems.stream()
.filter(a -> !a.getMsgtime().before(sliceStartTime) && a.getMsgtime().before(sliceStopTime))
.collect(Collectors.toList());
@@ -452,7 +421,7 @@ public class MqMessageRecordServiceImpl extends ServiceImpl<MqMessageRecordMappe
StringBuilder str = new StringBuilder();
// 遍历sessionItems构建字符串
for (SessionItem sessionItem : sessionItems) {
if ("text".equals(sessionItem.getMsgtype()) && "send".equals(sessionItem.getActiontype())) {
if (Objects.equals("text",sessionItem.getMsgtype()) && Objects.equals("send",sessionItem.getActiontype())) {
try {
// 反序列化content为ContentText对象
ContentText contentText = JSON.parseObject(sessionItem.getContent(), ContentText.class);
@@ -467,17 +436,33 @@ public class MqMessageRecordServiceImpl extends ServiceImpl<MqMessageRecordMappe
}
}
}
// 检查字符串是否不为空
if (str.length() > 0) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
DiffDefeatCorpuItem curDiffDefeatCorpuItem = new DiffDefeatCorpuItem();
curDiffDefeatCorpuItem.setSourceId(slices.get(0).getMsgid());
curDiffDefeatCorpuItem.setCategory(CategoryEnum.ENTERPRISE_WECHAT.getCode());
curDiffDefeatCorpuItem.setHappenTime(sdf.format(slices.get(0).getMsgtime()));
curDiffDefeatCorpuItem.setCorpuText(str.toString());
curDiffDefeatCorpuItems.add(curDiffDefeatCorpuItem);
DiffDefeatCorpuItem curItem = new DiffDefeatCorpuItem();
curItem.setSourceId(slices.get(0).getMsgid());
curItem.setCategory(CategoryEnum.ENTERPRISE_WECHAT.getCode());
curItem.setHappenTime(sdf.format(slices.get(0).getMsgtime()));
curItem.setCorpuText(str.toString());
curDiffDefeatCorpuItems.add(curItem);
}
}
}
public void toMqSinceType53(DiffDefeatanAlysis diffDefeatanAlysis, DiffDefeatResult diffDefeatResult, RabbitMqFormData oldItem){
String userStatus = getUserStatus(diffDefeatResult.getUserStatus());
DiffDefeatanCallbak curDiffDefeatCallback = new DiffDefeatanCallbak();
curDiffDefeatCallback.setBusinessId(diffDefeatanAlysis.getBusinessId());
curDiffDefeatCallback.setResponseCode(HandleStatusEnum.ANALYSIS_NORMAL.getCode().toString());
curDiffDefeatCallback.setLabel(userStatus);
curDiffDefeatCallback.setDescribe(diffDefeatResult.getAppointmentResult());
RabbitMqToData rabbitMqToData = new RabbitMqToData();
rabbitMqToData.setFormId(oldItem.getFormId());
rabbitMqToData.setSinceType(SinceTypeEnum.SINCETYPE2.getCode());
rabbitMqToData.setSubSinceType(SubSinceTypeEnum.SINCETYPE53.getCode());
rabbitMqToData.setData(curDiffDefeatCallback);
String callbackInput = JSONObject.toJSONString(rabbitMqToData);
rabbitTemplate.convertAndSend(Constant.rabbitToFormQueue, callbackInput);
log.info("发送回调MQ完成: {}", callbackInput);
}
}

View File

@@ -64,3 +64,11 @@ mybatis-plus:
type-aliases-package: com.volvo.reportbi.mapper
configuration:
call-setters-on-nulls: true
service:
dify:
url: https://artera-uat.digitalvolvo.com
dify:
user: voc
flowId: app-MVGxogM08CDCg7jMo7GNF6eS