增加日志,增加mock方法

This commit is contained in:
zren25
2025-05-15 14:17:33 +08:00
parent 6cfcade1f5
commit 31efa3dc0c
3 changed files with 25 additions and 10 deletions

View File

@@ -40,9 +40,9 @@ public class KafkaConfig {
// 第二个Kafka配置
@Bean(name = "analyticCenterKafkaTemplate")
public KafkaTemplate<String, String> analyticCenterKafkaTemplate(
@Value("${kafka.analyticCenter.bootstrap-servers}") String bootstrapServers,
@Value("${kafka.analyticCenter.producer.key-serializer}") String keySerializer,
@Value("${kafka.analyticCenter.producer.value-serializer}") String valueSerializer) {
@Value("${analyticCenterKafka.bootstrap-servers}") String bootstrapServers,
@Value("${analyticCenterKafka.producer.key-serializer}") String keySerializer,
@Value("${analyticCenterKafka.producer.value-serializer}") String valueSerializer) {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
@@ -72,10 +72,10 @@ public class KafkaConfig {
*/
@Bean(name = "analyticCenterConsumerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> analyticCenterConsumerFactory(
@Value("${kafka.analyticCenter.bootstrap-servers}") String bootstrapServers,
@Value("${kafka.analyticCenter.consumer.group}") String groupId,
@Value("${kafka.analyticCenter.consumer.key-deserializer}") String keyDeserializer,
@Value("${kafka.analyticCenter.consumer.value-deserializer}") String valueDeserializer) {
@Value("${analyticCenterKafka.bootstrap-servers}") String bootstrapServers,
@Value("${analyticCenterKafka.consumer.group}") String groupId,
@Value("${analyticCenterKafka.consumer.key-deserializer}") String keyDeserializer,
@Value("${analyticCenterKafka.consumer.value-deserializer}") String valueDeserializer) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

View File

@@ -2,6 +2,7 @@ package com.volvo.ai.analytic.center.controller;
import com.alibaba.fastjson.JSONObject;
import com.volvo.ai.analytic.center.dto.corpus.AicorpusTelephoneDTO;
import com.volvo.ai.analytic.center.service.AiAnalysisRequestLogsService;
import com.volvo.common.core.util.ResultMsg;
import io.swagger.annotations.Api;
@@ -36,6 +37,9 @@ public class TestController {
@Qualifier("analyticCenterKafkaTemplate")
private KafkaTemplate<String, String> kafkaProducer;
@Autowired
private KafkaTemplate<String, String> dccKafkaProducer;
@PostMapping("/mockMq")
@ApiOperation(value = "补偿处理消息")
public ResultMsg<Object> mockMq(@RequestBody String message) {
@@ -59,7 +63,16 @@ public class TestController {
kafkaProducer.send(topic,msg);
return ResultMsg.ok("ok");
}
@PostMapping("/mockDccKafka")
@ApiOperation(value = "补偿处理消息")
public ResultMsg<Object> mockDccKafka(@RequestBody AicorpusTelephoneDTO message) {
for(int i=0;i<20;i++){
message.setSourceId("000001"+i);
dccKafkaProducer.send("topic_voc_covert_text_log",JSONObject.toJSONString(message));
}
return ResultMsg.ok("ok");
}
}

View File

@@ -44,14 +44,15 @@ public class NameplateKafkaConsumer {
private RocketMQTemplate rocketMqTemplate;
@KafkaListener(topics = "${kafka.analyticCenter.consumer.topic}",
groupId = "${kafka.analyticCenter.consumer.group}" ,
@KafkaListener(topics = "${analyticCenterKafka.consumer.topic}",
groupId = "${analyticCenterKafka.consumer.group}" ,
containerFactory = "analyticCenterConsumerFactory",
concurrency = "3")
public void listen(String recordMessages, Acknowledgment ack) {
long startTime = System.currentTimeMillis();
log.info("nameplateKafkaConsumer message: {}", recordMessages);
log.info("nameplateKafkaConsumer 当前线程: {}, 线程ID: {}", Thread.currentThread().getName(), Thread.currentThread().getId());
log.info("nameplateKafkaConsumerMessage: {}", recordMessages);
if(StringUtils.isNotEmpty(recordMessages)){
try {
NameplateTableKafkaDTO tmNameplateCorpus = JSON.parseObject(recordMessages, NameplateTableKafkaDTO.class);
@@ -67,6 +68,7 @@ public class NameplateKafkaConsumer {
}
// 手动提交 offset
if (ack != null) {
log.info("nameplateKafkaConsumer ack.acknowledge");
ack.acknowledge();
}
}else {