gpt4 book ai didi

java - 仅 Spring Kafka 事务消费者

转载 作者:行者123 更新时间:2023-11-30 06:22:57 25 4
gpt4 key购买 nike

引用spring-kafka doc似乎可以配置监听器容器将偏移量提交绑定(bind)到事务。由于本段是在 Producer section 下报告的,从上下文中我可以推测以事务方式配置监听器容器是在提交生产者事务之前执行验证的一种方法。

如果只消费呢?是否可以使用 kafkamessagelistenercontainer (使用 kafkatransactionmanager 配置)以事务方式提交已消费消息的偏移量?我想不会,因为 kafka Producer 和 kafkaconsumer 不是相同的 api

最佳答案

尚不完全清楚您想要实现的目标。

Kafka 中事务的目的是任何生产者发送和消费者偏移量提交都是原子的(全进或全出)。

容器不知道监听器是否实际发布了任何消息,因此无论如何它都会以相同的方式工作(将消费者偏移量发送到事务)。

如果您想要发布一些属于事务的消息,您可以通过使用具有不同(非事务性)生产者工厂的 KafkaTemplate 来实现。这样,模板将不会找到交易的生产者并使用第二个生产者工厂中的生产者。如果事务回滚,发布的消息也不会回滚。

编辑

如果监听器容器配置了其他事务管理器(KafkaTransactionManager 除外,包括链式事务管理器),则容器无法将偏移量发送到事务,因为它不能访问交易的Producer。您可以使用 KafkaTemplate 将偏移量发送到监听器内的事务;它将使用交易的生产者。

但是,有两种方法可以满足您的需求。

  1. 使用 KTM 配置容器,并使用 @Transactional 注释您的监听器 - 使用 DB TxM - 对于正常情况,DB tx 将提交,并且容器将提交。如果出现故障,DB 将回滚,Kafka TX 将回滚。

  2. 不要使用 Kafka TM 并配置 new SeekToCurrentErrorHandler如果监听器抛出异常,容器将重放失败的传递。

关于java - 仅 Spring Kafka 事务消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47752678/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com