From e075ef3f01fa599800a3fdf407a19e8b7b1ea1a3 Mon Sep 17 00:00:00 2001 From: ZLI263 Date: Sat, 20 Sep 2025 20:59:21 +0800 Subject: [PATCH] =?UTF-8?q?=E9=93=AD=E7=89=8C=E5=9C=BA=E6=99=AF=E7=9A=84?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E9=87=8D=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../center/config/ExecutorConfig.java | 129 ++++++++++++++++++ 1 file changed, 129 insertions(+) create mode 100644 ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/config/ExecutorConfig.java diff --git a/ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/config/ExecutorConfig.java b/ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/config/ExecutorConfig.java new file mode 100644 index 0000000..3247e21 --- /dev/null +++ b/ai-analytic-center-biz/src/main/java/com/volvo/ai/analytic/center/config/ExecutorConfig.java @@ -0,0 +1,129 @@ +package com.volvo.ai.analytic.center.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import javax.annotation.PreDestroy; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Configuration +public class ExecutorConfig { + + @Value("${task.pool.corePoolSize}") + private int corePoolSize; + + @Value("${task.pool.maxPoolSize}") + private int maxPoolSize; + + @Value("${task.pool.keepAliveSeconds}") + private int keepAliveSeconds=6000000; + + @Value("${task.pool.queueCapacity}") + private int queueCapacity; + + private ThreadPoolTaskExecutor executor; + private ThreadPoolExecutor blockingExecutor; + + @Bean("threadPoolTaskExecutor") + @Primary + public ThreadPoolTaskExecutor corpusProcessExecutor() { + executor = new ThreadPoolTaskExecutor(); + + log.info("Thread-process-pool-Initializing: corePoolSize: {}, maxPoolSize: {}, queueCapacity: {}, keepAliveSeconds: {}",corePoolSize, maxPoolSize, queueCapacity, keepAliveSeconds); + // 设置核心线程数 Thread-process-pool-Initializing: corePoolSize: 10, maxPoolSize: 300, queueCapacity: 1000, keepAliveSeconds: 60 + executor.setCorePoolSize(corePoolSize); + // 设置最大线程数为5 + executor.setMaxPoolSize(maxPoolSize); + // 设置队列容量为10 + executor.setQueueCapacity(queueCapacity); + executor.setThreadNamePrefix("Thread-pool-lizh"); + // 使用阻塞策略:当队列满时,新任务会阻塞等待 + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.setKeepAliveSeconds(keepAliveSeconds); + // 允许核心线程超时,提高资源利用率 + executor.setAllowCoreThreadTimeOut(true); + executor.initialize(); + return executor; + } + + /** + * 使用Java自带的ThreadPoolExecutor实现真正的阻塞功能 + * 功能: + * 1. 最大线程数是50个 + * 2. 超过数量时排队等待,排队队列里最多100个对象,超过数量时阻塞 + */ + @Bean("blockingThreadPoolExecutor") + public ThreadPoolExecutor blockingThreadPoolExecutor() { + // 创建容量为100的阻塞队列 + BlockingQueue blockingQueue = new LinkedBlockingQueue<>(100); + + // 创建线程池执行器 + blockingExecutor = new ThreadPoolExecutor( + 10, // 核心线程数 + 50, // 最大线程数 + 60L, // 线程空闲时间 + TimeUnit.SECONDS, // 时间单位 + blockingQueue, // 阻塞队列,容量100 + r -> { // 线程工厂 + Thread t = new Thread(r, "Blocking-Thread-Pool-" + System.currentTimeMillis()); + t.setDaemon(false); + return t; + }, + new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略:调用者运行 + ); + + // 允许核心线程超时 + blockingExecutor.allowCoreThreadTimeOut(true); + + log.info("Java自带阻塞式线程池初始化完成 - 核心线程数: 10, 最大线程数: 50, 队列容量: 100"); + return blockingExecutor; + } + + @PreDestroy + public void destroy() { + // 关闭ThreadPoolTaskExecutor + if (executor != null) { + log.info("Thread-process-pool-ShuttingDown: {}", executor); + ThreadPoolExecutor threadPoolExecutor = this.executor.getThreadPoolExecutor(); + threadPoolExecutor.shutdown(); + + try { + // 等待所有任务完成,最多等待30秒 + if (!threadPoolExecutor.awaitTermination(30, TimeUnit.SECONDS)) { + log.warn("线程池未能在30秒内正常关闭,强制关闭"); + threadPoolExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("等待线程池关闭时被中断", e); + threadPoolExecutor.shutdownNow(); + } + } + + // 关闭Java自带的阻塞式线程池 + if (blockingExecutor != null) { + log.info("Java自带阻塞式线程池正在关闭: {}", blockingExecutor); + blockingExecutor.shutdown(); + + try { + // 等待所有任务完成,最多等待30秒 + if (!blockingExecutor.awaitTermination(30, TimeUnit.SECONDS)) { + log.warn("Java自带线程池未能在30秒内正常关闭,强制关闭"); + blockingExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("等待Java自带线程池关闭时被中断", e); + blockingExecutor.shutdownNow(); + } + } + } +}