铭牌场景的代码重构
This commit is contained in:
@@ -0,0 +1,129 @@
|
||||
package com.volvo.ai.analytic.center.config;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Primary;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Slf4j
|
||||
@Configuration
|
||||
public class ExecutorConfig {
|
||||
|
||||
@Value("${task.pool.corePoolSize}")
|
||||
private int corePoolSize;
|
||||
|
||||
@Value("${task.pool.maxPoolSize}")
|
||||
private int maxPoolSize;
|
||||
|
||||
@Value("${task.pool.keepAliveSeconds}")
|
||||
private int keepAliveSeconds=6000000;
|
||||
|
||||
@Value("${task.pool.queueCapacity}")
|
||||
private int queueCapacity;
|
||||
|
||||
private ThreadPoolTaskExecutor executor;
|
||||
private ThreadPoolExecutor blockingExecutor;
|
||||
|
||||
@Bean("threadPoolTaskExecutor")
|
||||
@Primary
|
||||
public ThreadPoolTaskExecutor corpusProcessExecutor() {
|
||||
executor = new ThreadPoolTaskExecutor();
|
||||
|
||||
log.info("Thread-process-pool-Initializing: corePoolSize: {}, maxPoolSize: {}, queueCapacity: {}, keepAliveSeconds: {}",corePoolSize, maxPoolSize, queueCapacity, keepAliveSeconds);
|
||||
// 设置核心线程数 Thread-process-pool-Initializing: corePoolSize: 10, maxPoolSize: 300, queueCapacity: 1000, keepAliveSeconds: 60
|
||||
executor.setCorePoolSize(corePoolSize);
|
||||
// 设置最大线程数为5
|
||||
executor.setMaxPoolSize(maxPoolSize);
|
||||
// 设置队列容量为10
|
||||
executor.setQueueCapacity(queueCapacity);
|
||||
executor.setThreadNamePrefix("Thread-pool-lizh");
|
||||
// 使用阻塞策略:当队列满时,新任务会阻塞等待
|
||||
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
|
||||
executor.setKeepAliveSeconds(keepAliveSeconds);
|
||||
// 允许核心线程超时,提高资源利用率
|
||||
executor.setAllowCoreThreadTimeOut(true);
|
||||
executor.initialize();
|
||||
return executor;
|
||||
}
|
||||
|
||||
/**
|
||||
* 使用Java自带的ThreadPoolExecutor实现真正的阻塞功能
|
||||
* 功能:
|
||||
* 1. 最大线程数是50个
|
||||
* 2. 超过数量时排队等待,排队队列里最多100个对象,超过数量时阻塞
|
||||
*/
|
||||
@Bean("blockingThreadPoolExecutor")
|
||||
public ThreadPoolExecutor blockingThreadPoolExecutor() {
|
||||
// 创建容量为100的阻塞队列
|
||||
BlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<>(100);
|
||||
|
||||
// 创建线程池执行器
|
||||
blockingExecutor = new ThreadPoolExecutor(
|
||||
10, // 核心线程数
|
||||
50, // 最大线程数
|
||||
60L, // 线程空闲时间
|
||||
TimeUnit.SECONDS, // 时间单位
|
||||
blockingQueue, // 阻塞队列,容量100
|
||||
r -> { // 线程工厂
|
||||
Thread t = new Thread(r, "Blocking-Thread-Pool-" + System.currentTimeMillis());
|
||||
t.setDaemon(false);
|
||||
return t;
|
||||
},
|
||||
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略:调用者运行
|
||||
);
|
||||
|
||||
// 允许核心线程超时
|
||||
blockingExecutor.allowCoreThreadTimeOut(true);
|
||||
|
||||
log.info("Java自带阻塞式线程池初始化完成 - 核心线程数: 10, 最大线程数: 50, 队列容量: 100");
|
||||
return blockingExecutor;
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void destroy() {
|
||||
// 关闭ThreadPoolTaskExecutor
|
||||
if (executor != null) {
|
||||
log.info("Thread-process-pool-ShuttingDown: {}", executor);
|
||||
ThreadPoolExecutor threadPoolExecutor = this.executor.getThreadPoolExecutor();
|
||||
threadPoolExecutor.shutdown();
|
||||
|
||||
try {
|
||||
// 等待所有任务完成,最多等待30秒
|
||||
if (!threadPoolExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
|
||||
log.warn("线程池未能在30秒内正常关闭,强制关闭");
|
||||
threadPoolExecutor.shutdownNow();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
log.error("等待线程池关闭时被中断", e);
|
||||
threadPoolExecutor.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
// 关闭Java自带的阻塞式线程池
|
||||
if (blockingExecutor != null) {
|
||||
log.info("Java自带阻塞式线程池正在关闭: {}", blockingExecutor);
|
||||
blockingExecutor.shutdown();
|
||||
|
||||
try {
|
||||
// 等待所有任务完成,最多等待30秒
|
||||
if (!blockingExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
|
||||
log.warn("Java自带线程池未能在30秒内正常关闭,强制关闭");
|
||||
blockingExecutor.shutdownNow();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
log.error("等待Java自带线程池关闭时被中断", e);
|
||||
blockingExecutor.shutdownNow();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user