@@ -22,13 +22,16 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.beans.factory.annotation.Value ;
import org.springframework.beans.factory.annotation.Value ;
import org.springframework.cloud.context.config.annotation.RefreshScope ;
import org.springframework.cloud.context.config.annotation.RefreshScope ;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor ;
import org.springframework.stereotype.Service ;
import org.springframework.stereotype.Service ;
import javax.annotation.Resource ;
import java.time.LocalDate ;
import java.time.LocalDate ;
import java.util.* ;
import java.util.* ;
import java.util.concurrent.CompletableFuture ;
import java.util.concurrent.CompletableFuture ;
import java.util.concurrent.ExecutorService ;
import java.util.concurrent.ExecutorService ;
import java.util.concurrent.Executors ;
import java.util.concurrent.Executors ;
import java.util.concurrent.Semaphore ;
/**
/**
@@ -65,6 +68,20 @@ public class TmNameplateCorpusServiceImpl extends ServiceImpl<TmNameplateCorpusM
@Autowired
@Autowired
private DataMaskingRuleService dataMaskingRuleService ;
private DataMaskingRuleService dataMaskingRuleService ;
// 使用多线程并行执行两个业务场景
@Autowired
@Resource ( name = " threadPoolTaskExecutor " )
private ThreadPoolTaskExecutor executor ;
private Semaphore semaphore = new Semaphore ( 6 ) ; // 限制并发数
@Value ( " ${dify.corpus.portrait.oneToken} " )
private String oneTokenPortrait ;
private String outputsFinal = " outputs " ;
private String aiAnalysisRequestIdFinal = " aiAnalysisRequestId " ;
@Override
@Override
public void runNameplateCorpusDifyRetry ( String paramJson ) {
public void runNameplateCorpusDifyRetry ( String paramJson ) {
long startTime = System . currentTimeMillis ( ) ;
long startTime = System . currentTimeMillis ( ) ;
@@ -125,7 +142,7 @@ public class TmNameplateCorpusServiceImpl extends ServiceImpl<TmNameplateCorpusM
public void processItem ( TmNameplateCorpus item ) {
public void processItem ( TmNameplateCorpus item ) {
try {
try {
Optional . ofNullable ( item ) . filter ( tmNameplateCorpus - > tmNameplateCorpus . getCustomerFlowId ( ) ! = null & & tmNameplateCorpus . getNameplateContent ( ) ! = null ) . orElseThrow ( ( ) - > new RuntimeException ( " 铭牌语料为空 " ) ) ;
Optional . ofNullable ( item ) . filter ( tmNameplateCorpus - > tmNameplateCorpus . getCustomerFlowId ( ) ! = null & & tmNameplateCorpus . getNameplateContent ( ) ! = null ) . orElseThrow ( ( ) - > new RuntimeException ( " 铭牌语料为空 " ) ) ;
String customerFlowId = item . getCustomerFlowId ( ) ;
String customerFlowId = item . getCustomerFlowId ( ) ;
String nameplateContent = item . getNameplateContent ( ) ;
String nameplateContent = item . getNameplateContent ( ) ;
@@ -146,7 +163,8 @@ public class TmNameplateCorpusServiceImpl extends ServiceImpl<TmNameplateCorpusM
inputMap . put ( " model " , carModel ) ;
inputMap . put ( " model " , carModel ) ;
inputMap . put ( " customerFlowId " , item . getCustomerFlowId ( ) ) ;
inputMap . put ( " customerFlowId " , item . getCustomerFlowId ( ) ) ;
inputMap . put ( " analysisScene " , " 3 " ) ;
inputMap . put ( " analysisScene " , " 3 " ) ;
inputMap . put ( " version " , 2 ) ;
inputMap . put ( " version " , 2 ) ;
inputMap . put ( " businessType " , BusinessTypeEnum . SMART_ASSISTANT_NAMEPLATE . getCode ( ) ) ;
diFyImageReq . setInputs ( inputMap ) ;
diFyImageReq . setInputs ( inputMap ) ;
CorpusReportDTO corpusReportDTO = new CorpusReportDTO ( ) ;
CorpusReportDTO corpusReportDTO = new CorpusReportDTO ( ) ;
@@ -154,10 +172,77 @@ public class TmNameplateCorpusServiceImpl extends ServiceImpl<TmNameplateCorpusM
corpusReportDTO . setAnalysisScene ( 3l ) ;
corpusReportDTO . setAnalysisScene ( 3l ) ;
corpusReportDTO . setCarModel ( carModel ) ;
corpusReportDTO . setCarModel ( carModel ) ;
// 获取配置
// 获取配置
JSONObject execDifyFlow = diFyService . executeDifyFlow ( diFyImageReq , BusinessTypeEnum . SMART_ASSISTANT_NAMEPLATE . getCode ( ) , JSONObject . toJSONString ( corpusReportDTO ) , null ) ;
log . info ( " runDify execDifyFlow {} " , execDifyFlow ) ;
log . info ( " 铭牌语料第1个业务场景, 优先执行。 " ) ;
long startTime = System . currentTimeMillis ( ) ;
CompletableFuture . runAsync ( ( ) - > {
try {
log . info ( " 铭牌语料可用许可授权数,总结和分类场景={} " , semaphore . availablePermits ( ) ) ;
// 获取许可 - 如果没有可用许可会阻塞等待
semaphore . acquire ( ) ;
//第一个业务场景 开始: 总结和分类业务场景
JSONObject execDifyFlow = diFyService . executeDifyFlow ( diFyImageReq , BusinessTypeEnum . SMART_ASSISTANT_NAMEPLATE . getCode ( ) ,
JSONObject . toJSONString ( corpusReportDTO ) , null ) ;
log . info ( " runDify execDifyFlow ,铭牌语料 ,总结和分类业务,返回: {} " , execDifyFlow ) ;
} catch ( Exception e ) {
log . error ( " 执行Dify失败: customerFlowId={}, 错误信息: {} " ,
item . getCustomerFlowId ( ) , e . getMessage ( ) , e ) ;
} finally {
// 释放许可
semaphore . release ( ) ;
}
} , executor ) ;
long endTime = System . currentTimeMillis ( ) ;
log . info ( " 第一个业务场景(总结和分类)执行时间: {} ms " , ( endTime - startTime ) ) ;
//第一个业务场景, 结束
long startTime2 = System . currentTimeMillis ( ) ;
try {
log . info ( " 铭牌语料可用许可授权数,画像场景={} " , semaphore . availablePermits ( ) ) ;
// 获取许可 - 如果没有可用许可会阻塞等待
semaphore . acquire ( ) ;
CompletableFuture . runAsync ( ( ) - > {
//"analysisScene": "分析类型", // 1、企微会话 2、AI通话录音 3、AI铭牌(客流) 4、AI铭牌(试驾)
DiFyReq diFyImageReq2 = new DiFyReq ( ) ;
diFyImageReq2 . setUser ( ConstantStr . corpus_user ) ;
Map < String , Object > inputMap2 = new HashMap < > ( ) ;
inputMap2 . put ( " businessId " , item . getCustomerFlowId ( ) ) ;
inputMap2 . put ( " communicateDate " , item . getNameplateEndTime ( ) . toString ( ) ) ;
inputMap2 . put ( " analysisScene " , " 3 " ) ;
inputMap2 . put ( " version " , 2 ) ;
inputMap2 . put ( " chat " , corpusChat ) ;
diFyImageReq2 . setInputs ( inputMap2 ) ;
// 创建新的DiFyReq对象以避免线程安全问题
diFyImageReq2 . setFlowId ( oneTokenPortrait ) ;
log . info ( " runDify execDifyFlow ,客户画像场景 , token-{} , 对象: {} " , oneTokenPortrait , diFyImageReq2 ) ;
JSONObject execDifyFlowForPortrait = diFyService . executeDifyFlow ( diFyImageReq2 , BusinessTypeEnum . CORPUS_PORTRAIT_NAMEPLATE . getCode ( ) ,
JSONObject . toJSONString ( corpusReportDTO ) , null ) ;
updateNameplate ( execDifyFlowForPortrait . toJSONString ( ) ) ;
log . info ( " runDify execDifyFlow ,铭牌语料 ,客户画像场景,返回: {} " , execDifyFlowForPortrait ) ;
} , executor ) ;
} catch ( Exception e ) {
log . error ( " 执行Dify失败: customerFlowId={}, 错误信息: {} " ,
item . getCustomerFlowId ( ) , e . getMessage ( ) , e ) ;
} finally {
// 释放许可
semaphore . release ( ) ;
}
long endTime2 = System . currentTimeMillis ( ) ;
log . info ( " 第二个业务场景(用户画像)执行时间: {} ms " , ( endTime2 - startTime2 ) ) ;
log . info ( " nameplate铭牌业务场景处理完总额和客户画像的总时间,启动线程 : {} " , ( endTime2 - startTime ) ) ;
// 等待两个任务完成
long endTime3 = System . currentTimeMillis ( ) ;
log . info ( " nameplate铭牌业务场景处理完总额和客户画像的总时间,执行完成:{} " , ( endTime3 - startTime ) ) ;
} catch ( Exception e ) {
} catch ( Exception e ) {
log . error ( " nameplate processItem error {} " , e . getMessage ( ) );
log . error ( " nameplate processItem error {} " , e ) ;
}
}
}
}
@@ -166,56 +251,94 @@ public class TmNameplateCorpusServiceImpl extends ServiceImpl<TmNameplateCorpusM
@Override
@Override
public void sendNameplateLto ( JSONObject execDifyFlow , String aiAnalysisRequestId , TmNameplateCorpus tmNameplateCorpus ) {
public void sendNameplateLto ( JSONObject execDifyFlow , String aiAnalysisRequestId , TmNameplateCorpus tmNameplateCorpus , String businessType ) {
log . info ( " 铭牌sendNameplateLto getCustomerFlowId:{} " , tmNameplateCorpus . getCustomerFlowId ( ) ) ;
if ( null ! = execDifyFlow & & execDifyFlow . get ( " status " ) . equals ( " succeeded " ) ) {
try {
String text = execDifyFlow . getString ( " outputs " ) ;
// 发送MQ
log . info ( " 铭牌sendNameplateLto toJSONString :{} " , execDifyFlow ) ;
log . info ( " 铭牌send mq {} " , text ) ;
log . info ( " tmNameplateCorpus.tostrign:{} " , tmNameplateCorpus . toString ( ) ) ;
tmTelephoneCorpusService . sendMq ( CategoryEnum . NAMEPLATE_VOICE . getCode ( ) , text ) ;
if ( execDifyFlow . get ( " status " ) . equals ( " succeeded " ) ) {
String text = execDifyFlow . getString ( outputsFinal ) ;
// 发送MQ
log . info ( " 铭牌send mq 业务类型是:{}, Json 是:{} " , businessType , text ) ;
if ( BusinessTypeEnum . CORPUS_PORTRAIT_NAMEPLATE . getCode ( ) . equals ( businessType ) ) { // 客户画像
tmTelephoneCorpusService . sendMq ( CategoryEnum . NAMEPLATE_VOICE_PORTRAIT . getCode ( ) , text ) ;
} else { //一句话总结+分类
tmTelephoneCorpusService . sendMq ( CategoryEnum . NAMEPLATE_VOICE . getCode ( ) , text ) ;
}
try {
ttNameplateRecordMapper . insert ( TtNameplateRecord . builder ( ) . nameplateCorpusId ( tmNameplateCorpus . getId ( ) ) . customerFlowId ( tmNameplateCorpus . getCustomerFlowId ( ) ) . build ( ) ) ;
ttNameplateRecordMapper . insert ( TtNameplateRecord . builder ( ) . nameplateCorpusId ( tmNameplateCorpus . getId ( ) ) . customerFlowId ( tmNameplateCorpus . getCustomerFlowId ( ) ) . build ( ) ) ;
aiAnalysisRequestLogsService . saveAiAnalysisRequestLogs ( AiAnalysisRequestLogs . builder ( ) . aiAnalysisRequestId ( aiAnalysisRequestId ) . businessResponse ( text ) . build ( ) ) ;
aiAnalysisRequestLogsService . saveAiAnalysisRequestLogs ( AiAnalysisRequestLogs . builder ( ) . aiAnalysisRequestId ( aiAnalysisRequestId ) . businessResponse ( text ) . build ( ) ) ;
} catch ( Exception e ) {
log . info ( " 铭牌语料处理保存报告异常processItem: {} " , e ) ;
}
}
} catch ( Exception e ) {
log . info ( " 铭牌语料处理保存报告异常processItem: {} " , e . getMessage ( ) ) ;
}
}
}
}
@Override
@Override
public ResultMsg < Object > updateNameplate ( String message ) {
public ResultMsg < Object > updateNameplate ( String message ) {
if ( StringUtils . isNotEmpty ( message ) ) {
try {
JSONObject analysisResp = JSONObject . parseObject ( message ) ;
if ( StringUtils . isNotEmpty ( message ) ) {
String aiAnalysisRequestId = analysisResp . getString ( " aiAnalysisRequestId " ) ;
log . info ( " updateNameplate 回调 message: {} " , message ) ;
String difyResponse = analysisResp . getString ( " difyResponse " ) ;
JSONObject analysisResp = JSONObject . parseObject ( message ) ;
String customerFlow Id = analysisResp . getString ( " customerFlowId " ) ;
String aiAnalysisRequest Id = analysisResp . getString ( aiAnalysisRequestIdFinal ) ;
Optional . ofNullable ( aiA nalysisRequestLogsService . queryByAiAnalysisRequestId ( aiAnalysisRequestId ) )
String difyResponse = analysisResp . getString ( " difyResponse " ) ;
. orElseThrow ( ( ) - > new IllegalArgumentException ( " AiAnalysisRequestId查询对象为空! " ) ) ;
String customerFlowId = analysisResp . getString ( " customerFlowId " ) ;
String businessType ;
AiAnalysisRequestLogs aiAnalysisRequestLogs = new AiAnalysisRequestLogs ( ) ;
AiAnalysisRequestLogs aiAnalysisRequestLogs = new AiAnalysisRequestLogs ( ) ;
aiAnalysisRequestLogs . setAiAnalysisRequestId ( aiA nalysisRequestId ) ;
JSONObject difyJson = analysisResp ;
aiAnalysisRequestLogs . setDifyResponse ( difyResponse ) ;
if ( StringUtils . isEmpty ( customerFlowId ) ) { // 客户画像场景
log . info ( " customerFlowId 为空,客户画像场景 " ) ;
JSONObject difyJson = JSONObject . parseObject ( difyResponse ) ;
String text = analysisResp . getString ( outputsFinal ) ;
aiAnalysisRequestLogs . setBusinessResponse ( difyJson . getString ( " outputs " ) ) ;
JSONObject outputs = JSONObject . parseObject ( text ) ;
aiAnalysisRequestLogsService . saveAiAnalysisRequestLogs ( aiAnalysisRequestLogs ) ;
customerFlowId = outputs . getString ( " businessId " ) ;
LambdaQueryWrapper < TmNameplateCorpus > queryWrapper = new LambdaQueryWrapper < > ( ) ;
aiAnalysisRequestId = analysisResp . getString ( aiAnalysisRequestIdFinal ) ;
queryWrapper . eq ( TmNameplateCorpus : : getCustomerFlowId , customerFlowId ) ;
businessType = BusinessTypeEnum . CORPUS_PORTRAIT_NAMEPLATE . getCode ( ) ;
List < TmNameplateCorpus > tmNameplateCorpusList = tmNameplateCorpusMapper . selectList ( queryWrapper ) ;
aiAnalysisRequestLogs . setBusinessResponse ( text ) ;
if ( CollectionUtils . isNotEmpty ( tmNameplateCorpusList ) ) {
} else { //总结+分类
sendNameplateLto ( difyJson , aiAnalysisRequestId , tmNameplateCorpusList . get ( 0 ) ) ;
businessType = BusinessTypeEnum . SMART_ASSISTANT_NAMEPLATE . getCode ( ) ;
LambdaQueryWrapper < AiAnalysisErrors > errorQueryWrapper = new LambdaQueryWrapper < > ( ) ;
difyJson = JSONObject . parseObject ( difyResponse ) ;
errorQueryWrapper . eq ( AiAnalysisErrors : : getAiAnalysisRequestId , aiAnalysisRequestId ) ;
aiAnalysisRequestLogs . setBusinessResponse ( difyJson . getString ( outputsFinal ) ) ;
errorQueryWrapper . eq ( AiAnalysisErrors : : getAiAnalysisErrorHandlingStatus , " 0 " ) ;
errorQueryWrapper . eq ( AiAnalysisErrors : : getIsDeleted , " 0 " ) ;
AiAnalysisErrors oldAiAnalysisErrors = aiAnalysisErrorsMapper . selectOne ( errorQueryWrapper ) ;
if ( oldAiAnalysisErrors ! = null ) {
aiAnalysisErrorsMapper . update ( AiAnalysisErrors . builder ( ) . aiAnalysisRequestId ( oldAiAnalysisErrors . getAiAnalysisRequestId ( ) ) . aiAnalysisErrorHandlingStatus ( " 1 " ) . build ( ) , errorQueryWrapper ) ;
}
}
Optional . ofNullable ( aiAnalysisRequestLogsService . queryByAiAnalysisRequestId ( aiAnalysisRequestId ) )
. orElseThrow ( ( ) - > new IllegalArgumentException ( " AiAnalysisRequestId查询对象为空! " ) ) ;
aiAnalysisRequestLogs . setAiAnalysisRequestId ( aiAnalysisRequestId ) ;
aiAnalysisRequestLogs . setDifyResponse ( difyResponse ) ;
aiAnalysisRequestLogs . setAiAnalysisRequestType ( businessType ) ;
aiAnalysisRequestLogsService . saveAiAnalysisRequestLogs ( aiAnalysisRequestLogs ) ;
LambdaQueryWrapper < TmNameplateCorpus > queryWrapper = new LambdaQueryWrapper < > ( ) ;
queryWrapper . eq ( TmNameplateCorpus : : getCustomerFlowId , customerFlowId ) ;
List < TmNameplateCorpus > tmNameplateCorpusList = tmNameplateCorpusMapper . selectList ( queryWrapper ) ;
if ( CollectionUtils . isNotEmpty ( tmNameplateCorpusList ) ) { //这是补偿机制
log . info ( " tmNameplateCorpusList:{} " , tmNameplateCorpusList . toString ( ) ) ;
sendNameplateLto ( difyJson , aiAnalysisRequestId , tmNameplateCorpusList . get ( 0 ) , businessType ) ;
LambdaQueryWrapper < AiAnalysisErrors > errorQueryWrapper = new LambdaQueryWrapper < > ( ) ;
errorQueryWrapper . eq ( AiAnalysisErrors : : getAiAnalysisRequestId , aiAnalysisRequestId ) ;
errorQueryWrapper . eq ( AiAnalysisErrors : : getAiAnalysisErrorHandlingStatus , " 0 " ) ;
errorQueryWrapper . eq ( AiAnalysisErrors : : getIsDeleted , " 0 " ) ;
AiAnalysisErrors oldAiAnalysisErrors = aiAnalysisErrorsMapper . selectOne ( errorQueryWrapper ) ;
if ( oldAiAnalysisErrors ! = null ) {
aiAnalysisErrorsMapper . update ( AiAnalysisErrors . builder ( ) . aiAnalysisRequestId ( oldAiAnalysisErrors . getAiAnalysisRequestId ( ) ) . aiAnalysisErrorHandlingStatus ( " 1 " ) . build ( ) , errorQueryWrapper ) ;
}
} else {
log . info ( " 没找到语料记录 " ) ;
}
return ResultMsg . ok ( ) ;
}
}
return ResultMsg . ok ( ) ;
} catch ( Exception e ) {
log . error ( " updateNameplate error {} " , e . getMessage ( ) ) ;
}
}
return ResultMsg . failed ( ) ;
return ResultMsg . failed ( ) ;
}
}
@Override
@Override