gpt4 book ai didi

java - KafkaListenerEndpointContainer 无法使用 Spring Kafka 创建 Kafka 事务

转载 作者:行者123 更新时间:2023-12-02 02:45:01 30 4
gpt4 key购买 nike

我正在使用 spring-Kafka 2.2.2.RELEASE(org.apache.kafka:kafka-clients:jar:2.0.1) 和 spring-boot(2.1.1)。我无法执行事务,因为我的监听器无法分配分区。我为一次性消费者创建了建议的配置。我正在尝试配置事务监听器容器和 Exactly Once 处理

我使用事务管理器配置了生产者和消费者,生产者使用事务ID,消费者使用isolation.level=read_commissed。

@Bean(name = "producerFactory")
public ProducerFactory<String, MyObject> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"txApp");
DefaultKafkaProducerFactory<String, KafkaSerializer> producerFactory = new DefaultKafkaProducerFactory<>(configProps);
producerFactory.setTransactionIdPrefix("tx.");

return producerFactory;
}



@Bean
public KafkaTransactionManager<?, ?> kafkaTransactionManager() {
KafkaTransactionManager<?, ?> kafkaTransactionManager = new KafkaTransactionManager<>(producerFactory());
// ...
return kafkaTransactionManager;
}

@Bean(name="appTemplate")
public KafkaTemplate<String,MyObject> kafkaTemplate(){
KafkaTemplate<String, MyObject> kafkaTemplate = new KafkaTemplate<>(
producerFactory());
return kafkaTemplate;
}

//Consumer

@Bean("kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory kafkaConsumerFactory,
KafkaTransactionManager kafkaTransactionManager) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setTransactionManager(kafkaTransactionManager());
return factory;
}

//in the Consumer
@KafkaListener(topics = "myTopic", groupId = "ingest", concurrency = "4")
public void listener(@Payload MyObject message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) throws ExecutionException, InterruptedException {

...

// In my producer

myTemplate.executeInTransaction(t-> t.send(kafkaConfig.getTopicName(), myMessage));

我期望看到消息到达我的监听器,但是当我执行生产者时,我收到以下错误:

22-07-2019 10:21:55.283 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR  o.a.k.c.c.i.ConsumerCoordinator.onJoinComplete request.id= request.caller=  - [Consumer clientId=consumer-2, groupId=ingest] User provided listener org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener failed on partition assignment 
org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms.
at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:150)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:378)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:137)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener.onPartitionsAssigned(KafkaMessageListenerContainer.java:1657)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:283)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:343)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:719)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:676)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms.

最佳答案

查看服务器日志;很可能您没有足够的副本来支持事务(默认为 3)。如果您只是进行测试,可以将其设置为 1。

查看代理属性transaction.state.log.replication.factormin.insync.replicas

The replication factor for the transaction topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.

When a producer sets acks to "all" (or "-1"), this configuration specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend). When used together, min.insync.replicas and acks allow you to enforce greater durability guarantees. A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with acks of "all". This will ensure that the producer raises an exception if a majority of replicas do not receive a write.

关于java - KafkaListenerEndpointContainer 无法使用 Spring Kafka 创建 Kafka 事务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57151440/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com