异常关闭线程

(cherry picked from commit 106e34fbd7)
This commit is contained in:
lxu75
2025-05-25 15:01:39 +08:00
committed by lxu75
parent 58d38aa730
commit f556f012b3
2 changed files with 37 additions and 26 deletions

View File

@@ -76,12 +76,12 @@ public class CorpusProcessKafkaProducer {
@KafkaListener(topics = "${spring.kafka.topic}", groupId = "${spring.kafka.group}")
public void listen(List<ConsumerRecord<String, Object>> recordMessages) {
long startTime = System.currentTimeMillis();
log.info("CorpusProcessKafkaProducer Received message: {}", recordMessages);
// 获取消息列表
int optimalThreadPoolSize = Runtime.getRuntime().availableProcessors() + 2;
log.info("获取的线程数:{}", optimalThreadPoolSize);
ExecutorService executor = Executors.newFixedThreadPool(optimalThreadPoolSize);
try {
log.info("CorpusProcessKafkaProducer Received message: {}", recordMessages);
// 获取消息列表
int optimalThreadPoolSize = Runtime.getRuntime().availableProcessors() + 2;
log.info("获取的线程数:{}", optimalThreadPoolSize);
ExecutorService executor = Executors.newFixedThreadPool(optimalThreadPoolSize);
if(CollectionUtils.isNotEmpty(recordMessages)){
log.info("CorpusProcessKafkaProducer List size: {}", recordMessages.size());
for (ConsumerRecord<String, Object> record : recordMessages) {
@@ -120,6 +120,8 @@ public class CorpusProcessKafkaProducer {
}
} catch (Exception e) {
log.error("CorpusProcessKafkaProducer 电话语料 解析JSON出错: {}", e.getMessage());
}finally {
executor.shutdown();
}
});
@@ -131,6 +133,8 @@ public class CorpusProcessKafkaProducer {
// 在这里可以添加对解析后的对象的进一步处理逻辑
} catch (Exception e) {
log.error("CorpusProcessKafkaProducer 电话语料 解析JSON出错: {}" , e.getMessage());
}finally {
executor.shutdown();
}
}

View File

@@ -125,27 +125,34 @@ public class TmOdsVdqwMessagearchivingServiceImpl extends ServiceImpl<TmOdsVdqwM
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(optimalThreadPoolSize); // 根据需求调整线程池大小
for (int i = 1; i <= totalPages; i++) {
int offset = (i - 1) * pageSize;
List<OdsVdqwMessageOTD> messageList = tmOdsVdqwMessagearchivingMapper.queryOdsVdqwMessageByData(statTime, endTime, offset, pageSize, retry);
// 处理查询到的数据
// 使用 CompletableFuture 并行处理
CompletableFuture<?>[] futures = messageList.stream()
.map(item -> CompletableFuture.runAsync(() -> {
try {
processItem(item, finalStatTime, finalEndTime);
} catch (Exception e) {
log.error("处理企微语料失败: FromUserId={}, AcceptUserId={}, 异常: {}",
item.getFromUserId(), item.getAcceptUserId(), e.getMessage(), e);
}
}, executor))
.toArray(CompletableFuture[]::new);
// 等待所有任务完成
CompletableFuture.allOf(futures).join();
}
// 关闭线程池
executor.shutdown();
log.info("企微数据跑批结束 耗时:{}",System.currentTimeMillis()-startTime);
try {
for (int i = 1; i <= totalPages; i++) {
int offset = (i - 1) * pageSize;
List<OdsVdqwMessageOTD> messageList = tmOdsVdqwMessagearchivingMapper.queryOdsVdqwMessageByData(statTime, endTime, offset, pageSize, retry);
// 处理查询到的数据
// 使用 CompletableFuture 并行处理
CompletableFuture<?>[] futures = messageList.stream()
.map(item -> CompletableFuture.runAsync(() -> {
try {
processItem(item, finalStatTime, finalEndTime);
} catch (Exception e) {
log.error("处理企微语料失败: FromUserId={}, AcceptUserId={}, 异常: {}",
item.getFromUserId(), item.getAcceptUserId(), e.getMessage(), e);
}
}, executor))
.toArray(CompletableFuture[]::new);
// 等待所有任务完成
CompletableFuture.allOf(futures).join();
}
// 关闭线程池
executor.shutdown();
} catch (Exception e) {
log.error("企微数据跑批异常",e);
} finally {
// 关闭线程池
executor.shutdown();
}
log.info("企微数据跑批结束 耗时:{}",System.currentTimeMillis()-startTime);
}
private void processItem(OdsVdqwMessageOTD item, String statTime, String endTime) {