修改kafka验证mock方法

This commit is contained in:
zren25
2025-05-15 16:34:36 +08:00
parent bd72767f99
commit 2032f59ce0
3 changed files with 15 additions and 12 deletions

View File

@@ -19,15 +19,14 @@ import java.util.Map;
@RefreshScope
public class KafkaConfig {
/**
*
// 第一个Kafka配置
@Bean(name = "dccKafkaTemplate")
public KafkaTemplate<String, String> dccKafkaTemplate(
@Value("${kafka.dcc.bootstrap-servers}") String bootstrapServers,
@Value("${kafka.dcc.producer.key-serializer}") String keySerializer,
@Value("${kafka.dcc.producer.value-serializer}") String valueSerializer) {
@Value("${spring.kafka.bootstrap-servers}") String bootstrapServers,
@Value("${analyticCenterKafka.producer.key-serializer}") String keySerializer,
@Value("${analyticCenterKafka.producer.value-serializer}") String valueSerializer) {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
@@ -36,7 +35,7 @@ public class KafkaConfig {
return new KafkaTemplate<>(factory);
}
*/
// 第二个Kafka配置
@Bean(name = "analyticCenterKafkaTemplate")
@@ -88,6 +87,7 @@ public class KafkaConfig {
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setBatchListener(true);
return factory;
}

View File

@@ -38,6 +38,7 @@ public class TestController {
private KafkaTemplate<String, String> kafkaProducer;
@Autowired
@Qualifier("dccKafkaTemplate")
private KafkaTemplate<String, String> dccKafkaProducer;
@PostMapping("/mockMq")

View File

@@ -284,13 +284,15 @@ public class TmNameplateCorpusServiceImpl extends ServiceImpl<TmNameplateCorpusM
@Override
public ResultMsg<Object> mockInsert(String data) {
if(StringUtils.isNotEmpty(data)){
TmNameplateCorpus analysisResp = JSONObject.parseObject(data, TmNameplateCorpus.class);
for(int i=0;i<100;i++){
analysisResp.setCustomerFlowId(analysisResp.getCustomerFlowId()+i);
tmNameplateCorpusMapper.insert(analysisResp);
}
return ResultMsg.ok(analysisResp);
List<TmNameplateCorpus> tmNameplateCorpusList = new ArrayList<>();
for(int i=0;i<100;i++){
TmNameplateCorpus analysisResp = JSONObject.parseObject(data, TmNameplateCorpus.class);
analysisResp.setCustomerFlowId(analysisResp.getCustomerFlowId()+i);
tmNameplateCorpusList.add(analysisResp);
}
this.saveBatch(tmNameplateCorpusList);
return ResultMsg.ok();
}
return ResultMsg.failed();
}