定位太快消费的问题

This commit is contained in:
ZLI263
2025-09-20 21:59:40 +08:00
parent e075ef3f01
commit 812ab05ed0
2 changed files with 8 additions and 4 deletions

View File

@@ -39,8 +39,8 @@ public class NameplateKafkaConsumer {
@KafkaListener(topics = "${analyticCenterKafka.consumer.topic}", @KafkaListener(topics = "${analyticCenterKafka.consumer.topic}",//= smart_assistant_nameplate_topic
groupId = "${analyticCenterKafka.consumer.group}" , groupId = "${analyticCenterKafka.consumer.group}" ,//smart_assistant_nameplate_topic_group
containerFactory = "analyticCenterConsumerFactory", containerFactory = "analyticCenterConsumerFactory",
concurrency = "${analyticCenterKafka.consumer.concurrency}") concurrency = "${analyticCenterKafka.consumer.concurrency}")
public void listen(String recordMessages, Acknowledgment ack, public void listen(String recordMessages, Acknowledgment ack,
@@ -56,8 +56,9 @@ public class NameplateKafkaConsumer {
} }
try { try {
NameplateTableKafkaDTO tmNameplateCorpus = JSON.parseObject(recordMessages, NameplateTableKafkaDTO.class); NameplateTableKafkaDTO tmNameplateCorpus = JSON.parseObject(recordMessages, NameplateTableKafkaDTO.class);
if (!"INSERT".equals(tmNameplateCorpus.getType()) || !"UPDATE".equals(tmNameplateCorpus.getType()) || CollectionUtils.isEmpty(tmNameplateCorpus.getData())) { if ((!"INSERT".equals(tmNameplateCorpus.getType()) && !"UPDATE".equals(tmNameplateCorpus.getType()) )|| CollectionUtils.isEmpty(tmNameplateCorpus.getData())) {
ack.acknowledge(); ack.acknowledge();
log.info("nameplateKafkaConsumerMessage: 不符合预期的消息 {}", recordMessages);
return; return;
} }

View File

@@ -73,7 +73,10 @@ public class TmNameplateCorpusServiceImpl extends ServiceImpl<TmNameplateCorpusM
@Autowired @Autowired
@Resource(name = "threadPoolTaskExecutor") @Resource(name = "threadPoolTaskExecutor")
private ThreadPoolTaskExecutor executor; private ThreadPoolTaskExecutor executor;
private Semaphore semaphore = new Semaphore(6); // 限制并发数 @Value("${batch.threadNum}")
int threadNum ;
private Semaphore semaphore = new Semaphore(threadNum); // 限制并发数
@Value("${dify.corpus.portrait.oneToken}") @Value("${dify.corpus.portrait.oneToken}")
private String oneTokenPortrait; private String oneTokenPortrait;