gpt4 book ai didi

java - 不确认消息是否会导致将其重新传递给 Kafka 消费者?

转载 作者:行者123 更新时间:2023-12-02 02:10:51 25 4
gpt4 key购买 nike

我正在运行一个 Spring Cloud Stream 应用程序,在其中从 Kafka 主题读取事务,处理该事务,然后将它们发送到 IBM MQ。我正在尝试处理与 IBM MQ 没有连接时的错误,以防止事务丢失。在这种情况下,jms 模板将引发异常,并且流监听器将不会提交事务。预期的行为是事务保留在 Kafka 主题中,并且流监听器再次读取它。然而,该消息似乎只被消耗一次,并且没有发生“回滚”。为此,我的配置如下:

spring:
cloud:
stream:
kafka:
bindings:
input:
consumer:
auto-commit-offset: false
bindings:
input:
destination: kafka_topic
brokers: localhost:9092

这是代码:

    public void handleMessage(Message<TransactionMessage> request,  @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
TransactionMessage message = request.getPayload();
System.out.println("Consumed a message");
try {
executionFlow.execute(message); // here the jmsTemplate throws an exception
System.out.println("doing the ack");
acknowledgment.acknowledge();
}
catch (RuntimeException e) {
System.out.println("did not send to MQ");
}
}

executionFlow 调用的 jmsTemplate 的代码:

    public void sendMessage(String messageTarget) {

System.out.println("i am trying to send to MQ");
try {
jmsTemplate.convertAndSend(destinationTopicQueue, messageTarget);
} catch (Exception e) {
throw new RuntimeException("jmsTemplate failed to send to IBM MQ");
}
}

以下是我关闭与 IBM MQ 的连接时的输出:

Consumed a message
i am trying to send to MQ
did not send to MQ

最佳答案

   catch (RuntimeException e) {
System.out.println("did not send to MQ");
}

您需要重新抛出异常才能导致回滚。

您还需要在 Binder 中启用 Kafka 事务。

参见Kafka Binder Properties .

spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix

Enables transactions in the binder. See transaction.id in the Kafka documentation and Transactions in the spring-kafka documentation. When transactions are enabled, individual producer properties are ignored and all producers use the spring.cloud.stream.kafka.binder.transaction.producer.* properties.

Default null (no transactions)

如果你也向kafka发送数据,则需要事务性生产者

spring.cloud.stream.kafka.binder.transaction.producer.*

Global producer properties for producers in a transactional binder. See spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix and Kafka Producer Properties and the general producer properties supported by all binders.

Default: See individual producer properties.

关于java - 不确认消息是否会导致将其重新传递给 Kafka 消费者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57328625/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com