删除无用代码
This commit is contained in:
@@ -1,54 +0,0 @@
|
||||
package com.volvo.ai.analytic.center.mq;
|
||||
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@Component
|
||||
public class ConsumerStateManager {
|
||||
// 使用 ThreadLocal 存储每个线程的 Consumer 实例
|
||||
private final ThreadLocal<Consumer<?, ?>> consumerThreadLocal = new ThreadLocal<>();
|
||||
private final AtomicInteger messageCount = new AtomicInteger(0);
|
||||
private volatile boolean paused = false;
|
||||
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
|
||||
|
||||
// 绑定当前线程的 Consumer
|
||||
public void bindConsumer(Consumer<?, ?> consumer) {
|
||||
consumerThreadLocal.set(consumer);
|
||||
}
|
||||
|
||||
// 获取当前线程绑定的 Consumer
|
||||
public Consumer<?, ?> getKafkaConsumer() {
|
||||
return consumerThreadLocal.get();
|
||||
}
|
||||
|
||||
// 更新计数器并检查是否需要暂停
|
||||
public void checkAndPause() {
|
||||
if (messageCount.incrementAndGet() >= 30 && !paused) {
|
||||
paused = true;
|
||||
Consumer<?, ?> consumer = getKafkaConsumer();
|
||||
if (consumer != null) {
|
||||
consumer.pause(consumer.assignment());
|
||||
scheduler.schedule(this::resume, 2, TimeUnit.MINUTES);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 恢复消费
|
||||
private void resume() {
|
||||
paused = false;
|
||||
messageCount.set(0);
|
||||
Consumer<?, ?> consumer = getKafkaConsumer();
|
||||
if (consumer != null) {
|
||||
consumer.resume(consumer.assignment());
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isPaused() {
|
||||
return paused;
|
||||
}
|
||||
}
|
||||
@@ -1,37 +0,0 @@
|
||||
package com.volvo.ai.analytic.center.mq;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.concurrent.*;
|
||||
@Slf4j
|
||||
@Component
|
||||
public class KafkaMessageScheduler {
|
||||
|
||||
private final BlockingQueue<Runnable> messageQueue = new LinkedBlockingQueue<>();
|
||||
|
||||
@PostConstruct
|
||||
public void startScheduler() {
|
||||
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
|
||||
scheduler.scheduleAtFixedRate(() -> {
|
||||
try {
|
||||
Runnable task = messageQueue.poll(1, TimeUnit.SECONDS);
|
||||
if (task != null) {
|
||||
task.run();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}, 0, 4, TimeUnit.SECONDS); // 每隔 4 秒执行一次
|
||||
}
|
||||
|
||||
public boolean submit(Runnable task) {
|
||||
if (messageQueue.remainingCapacity() == 0) {
|
||||
log.warn("Kafka message queue is full, task rejected.");
|
||||
return false;
|
||||
}
|
||||
log.info("messageQueueSize: {}",messageQueue.size());
|
||||
return messageQueue.offer(task);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user