修改kafka验证

This commit is contained in:
zren25
2025-05-16 18:32:37 +08:00
parent abc35c0b68
commit ac53ea94f0
2 changed files with 20 additions and 37 deletions

View File

@@ -32,7 +32,6 @@ public class KafkaMessageScheduler {
return false; return false;
} }
log.info("messageQueueSize: {}",messageQueue.size()); log.info("messageQueueSize: {}",messageQueue.size());
messageQueue.offer(task); return messageQueue.offer(task);
return true;
} }
} }

View File

@@ -2,18 +2,14 @@
package com.volvo.ai.analytic.center.mq; package com.volvo.ai.analytic.center.mq;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.volvo.ai.analytic.center.dto.corpus.NameplateTableKafkaDTO; import com.volvo.ai.analytic.center.dto.corpus.NameplateTableKafkaDTO;
import com.volvo.ai.analytic.center.service.TmNameplateCorpusService; import com.volvo.ai.analytic.center.service.TmNameplateCorpusService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.context.config.annotation.RefreshScope; import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.KafkaHeaders;
@@ -22,8 +18,6 @@ import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* @ClassName NameplateKafkaConsumer * @ClassName NameplateKafkaConsumer
@@ -42,16 +36,11 @@ public class NameplateKafkaConsumer {
@Autowired @Autowired
private TmNameplateCorpusService tmNameplateCorpusService; private TmNameplateCorpusService tmNameplateCorpusService;
private final ObjectMapper objectMapper = new ObjectMapper();
@Resource @Resource
private RocketMQTemplate rocketMqTemplate; private RocketMQTemplate rocketMqTemplate;
@Autowired
@Qualifier("kafkaTaskExecutor")
public ExecutorService kafkaTaskExecutor;
@Autowired
public KafkaMessageScheduler kafkaMessageScheduler;
@KafkaListener(topics = "${analyticCenterKafka.consumer.topic}", @KafkaListener(topics = "${analyticCenterKafka.consumer.topic}",
groupId = "${analyticCenterKafka.consumer.group}" , groupId = "${analyticCenterKafka.consumer.group}" ,
containerFactory = "analyticCenterConsumerFactory", containerFactory = "analyticCenterConsumerFactory",
@@ -63,35 +52,30 @@ public class NameplateKafkaConsumer {
log.info("nameplateKafkaConsumer 当前线程: {}, 线程ID: {},计数:{}", Thread.currentThread().getName(), Thread.currentThread().getId()); log.info("nameplateKafkaConsumer 当前线程: {}, 线程ID: {},计数:{}", Thread.currentThread().getName(), Thread.currentThread().getId());
log.info("nameplateKafkaConsumerMessage: {}", recordMessages); log.info("nameplateKafkaConsumerMessage: {}", recordMessages);
// 初始化绑定 Consumer // 初始化绑定 Consumer
if (StringUtils.isEmpty(recordMessages)) { if (StringUtils.isEmpty(recordMessages)) {
ack.acknowledge(); ack.acknowledge();
return; return;
} }
try {
NameplateTableKafkaDTO tmNameplateCorpus = JSON.parseObject(recordMessages, NameplateTableKafkaDTO.class); NameplateTableKafkaDTO tmNameplateCorpus = JSON.parseObject(recordMessages, NameplateTableKafkaDTO.class);
if (!"INSERT".equals(tmNameplateCorpus.getType()) || CollectionUtils.isEmpty(tmNameplateCorpus.getData())) { if (!"INSERT".equals(tmNameplateCorpus.getType()) || CollectionUtils.isEmpty(tmNameplateCorpus.getData())) {
ack.acknowledge(); ack.acknowledge();
return; return;
}
boolean isSubmit = kafkaMessageScheduler.submit(() -> {
try {
// 真正的业务逻辑在这里执行
tmNameplateCorpus.getData().forEach(nameplate -> tmNameplateCorpusService.processItem(nameplate));
} catch (Exception e) {
log.error("消息处理失败", e);
} }
});
if (isSubmit){
log.info("nameplateKafkaConsumeracknowledge{}");
ack.acknowledge(); // 手动提交 offset
}
log.info("nameplateKafkaConsumer消息处理完成耗时{}", System.currentTimeMillis() - startTime);
tmNameplateCorpus.getData().forEach(nameplate -> tmNameplateCorpusService.processItem(nameplate));
log.info("nameplateKafkaConsumeracknowledge{}",partitionId, offset);
// 手动提交 offset
} catch (Exception e) {
log.info("nameplateKafkaConsumerFailed to process message: {}", e);
}finally {
try {
ack.acknowledge(); // 提交 offset
} catch (IllegalStateException e) {
log.warn("Offset 已提交,跳过重复提交");
}
}
log.info("nameplateKafkaConsumer消息处理开始耗时{}", System.currentTimeMillis() - startTime);
} }
} }