gpt4 book ai didi

spring-boot - 在 Kafka 监听器线程中生成记录时获取 ProducerFencedException

转载 作者:行者123 更新时间:2023-12-05 05:50:11 26 4
gpt4 key购买 nike

我在 kafka 监听器容器内生成消息时遇到此异常。

javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=producer-tx-group.topicA.1
org.apache.kafka.common.errors.ProducerFencedException: The producer has been rejected from the broker because it tried to use an old epoch with the transactionalId

我的听众看起来像这样

@Transactional
@kafkaListener(...)
listener(topicA, message){
process(message)
produce(topicB, notification) // use Kafkatemplate to send the message
}

我的配置是这样的

    @Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(KafkaTransactionManager kafkaTransactionManager) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
return factory;
}

public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
DefaultKafkaProducerFactory<String, Object> factory = new
DefaultKafkaProducerFactory<>(props);
factory.setTransactionIdPrefix(transactionIdPrefix);
return factory;
}


@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> template = new KafkaTemplate<>(producerFactory());
return template;
}

@Bean
public KafkaTransactionManager kafkaTransactionManager() {
KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory());
return manager;
}

我知道 Kafka 何时抛出 ProducerFencedException,但我想在这里弄清楚第二个具有相同 transaction.id 的生产者在哪里。

如果我在 Kafka 模板中设置唯一的事务前缀,它就可以正常工作

    @Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> template = new KafkaTemplate<>(producerFactory());
template.setTransactionIdPrefix(MessageFormat.format("{0}-{1}", transactionIdPrefix, UUID.randomUUID().toString()));
return template;
}

但我试图理解这里的异常,从另一个生产者开始使用相同的事务 ID 开始,按照 spring 文档 group.id/topic/partition

我只是在本地对单个应用程序实例进行尝试。

最佳答案

我找到了根本原因,我在这里创建了两个生产者实例

    public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
DefaultKafkaProducerFactory<String, Object> factory = new
DefaultKafkaProducerFactory<>(props);
factory.setTransactionIdPrefix(transactionIdPrefix);
return factory;
}

我缺少 Bean 配置。在生产因子上添加 @Bean 并在模板和 TM 中正确地自动连接它解决了这个问题。

关于spring-boot - 在 Kafka 监听器线程中生成记录时获取 ProducerFencedException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70551925/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com