去掉mock
This commit is contained in:
@@ -77,22 +77,5 @@ public class QiWeiCorpusJob {
|
||||
return ResultMsg.ok();
|
||||
}
|
||||
|
||||
@PostMapping("qiWeiCorpusTaskMock")
|
||||
public ResultMsg qiWeiCorpusTaskMock(@RequestBody String paramJson) {
|
||||
try {
|
||||
// 获取任务参数
|
||||
String param = XxlJobHelper.getJobParam();
|
||||
if(StringUtils.isEmpty(param)){
|
||||
param = paramJson;
|
||||
}
|
||||
// 执行业务逻辑
|
||||
XxlJobHelper.log("任务参数: {}", param);
|
||||
log.info("qiWeiCorpusTask 企微语料查询处理:{}",param);
|
||||
tmOdsVdqwMessagearchivingService.runQiWeiCorpusDifyMock(param);
|
||||
} catch (Exception e) {
|
||||
log.error("processMessageByTask 定时任务补偿处理消息异常",e.getMessage());
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return ResultMsg.ok();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -12,6 +12,4 @@ public interface TmOdsVdqwMessagearchivingService extends IService<TmOdsVdqwMess
|
||||
void runQiWeiCorpusDify(String paramJson);
|
||||
|
||||
|
||||
void runQiWeiCorpusDifyMock(String paramJson);
|
||||
|
||||
}
|
||||
@@ -257,68 +257,4 @@ public class TmOdsVdqwMessagearchivingServiceImpl extends ServiceImpl<TmOdsVdqwM
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void runQiWeiCorpusDifyMock(String paramJson) {
|
||||
log.info("runQiWeiCorpusDify paramJson {}", paramJson);
|
||||
// 获取当前日期
|
||||
LocalDate today = LocalDate.now();
|
||||
// 获取前一天日期
|
||||
LocalDate yesterday = today.minusDays(1);
|
||||
boolean retry = false;
|
||||
// 格式化输出
|
||||
String formattedDate = yesterday.toString(); // 默认格式为 yyyy-MM-dd
|
||||
String statTime = formattedDate.concat(" 00:00:00");
|
||||
String endTime = formattedDate.concat(" 23:59:59");
|
||||
if (StringUtils.isNotBlank(paramJson)) {
|
||||
JSONObject paramJsonObj = JSONObject.parseObject(paramJson);
|
||||
if (null != paramJsonObj && paramJsonObj.containsKey("statTime") && paramJsonObj.containsKey("endTime")) {
|
||||
statTime = paramJsonObj.getString("statTime");
|
||||
endTime = paramJsonObj.getString("endTime");
|
||||
retry = paramJsonObj.getBoolean("retry");
|
||||
}
|
||||
}
|
||||
String finalStatTime = statTime;
|
||||
String finalEndTime = endTime;
|
||||
|
||||
Integer total = tmOdsVdqwMessagearchivingMapper.countOdsVdqwMessageByData(statTime, endTime,retry);
|
||||
int totalPages = PageDto.getTotalPages(total, pageSize);
|
||||
|
||||
// 获取消息列表
|
||||
int optimalThreadPoolSize = Runtime.getRuntime().availableProcessors() + 1;
|
||||
log.info("获取的线程数:{}",optimalThreadPoolSize);
|
||||
// 创建线程池
|
||||
ExecutorService executor = Executors.newFixedThreadPool(optimalThreadPoolSize); // 根据需求调整线程池大小
|
||||
List<OdsVdqwMessageOTD> mockMessageList = new ArrayList<>();
|
||||
List<OdsVdqwMessageOTD> messageList = tmOdsVdqwMessagearchivingMapper.queryOdsVdqwMessageByData(statTime, endTime, 0, pageSize, retry);
|
||||
for(int i = 1; i <= 4000; i++){
|
||||
if(mockMessageList.size()<2000){
|
||||
mockMessageList.addAll(messageList);
|
||||
}
|
||||
}
|
||||
log.info("runQiWeiCorpusDifyMock 数据量:",mockMessageList.size());
|
||||
// mock
|
||||
for (int i = 1; i <= 5; i++) {
|
||||
log.info("runQiWeiCorpusDifyMock 数据量:{},第:{} 批次",mockMessageList.size(),i);
|
||||
// 处理查询到的数据
|
||||
// 使用 CompletableFuture 并行处理
|
||||
final int[] count = {1};
|
||||
CompletableFuture<?>[] futures = mockMessageList.stream()
|
||||
.map(item -> CompletableFuture.runAsync(() -> {
|
||||
log.info("runQiWeiCorpusDifyMock 异步执行{},第:{} 批次", count[0],JSONObject.toJSONString(item));
|
||||
try {
|
||||
processItem(item, finalStatTime, finalEndTime);
|
||||
} catch (Exception e) {
|
||||
log.error("处理企微语料失败: FromUserId={}, AcceptUserId={}, 异常: {}",
|
||||
item.getFromUserId(), item.getAcceptUserId(), e.getMessage(), e);
|
||||
}
|
||||
count[0] = count[0] +1;
|
||||
}, executor))
|
||||
.toArray(CompletableFuture[]::new);
|
||||
// 等待所有任务完成
|
||||
CompletableFuture.allOf(futures).join();
|
||||
}
|
||||
// 关闭线程池
|
||||
executor.shutdown();
|
||||
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user