gpt4 book ai didi

java - 将确认传递给 spring KafkaListener 消费者方法

转载 作者:行者123 更新时间:2023-12-02 01:08:14 29 4
gpt4 key购买 nike

我正在尝试关闭 kafka 中的自动提交,而是手动执行。为此,在我的 application.properties 中,我设置了 spring.kafka.properties.enable.auto.commit=false

我目前还有一个具有以下 header 的方法:

@KafkaListener(id="${"+ KafkaConfiguration.APP_REQUEST_ID +"}", topics = "${"+KafkaConfiguration.PPA_REQUEST_TOPIC +"}")
public void receive(@Payload String message,
@Headers MessageHeaders headers)

我的理解是,为了手动提交,我需要访问 Acknowledgement 对象,该对象将作为参数传递给我的 receive() 方法。我的问题:如果我将标题更改为

@KafkaListener(id="${"+ KafkaConfiguration.APP_REQUEST_ID +"}", topics = "${"+KafkaConfiguration.APP_REQUEST_TOPIC +"}")
public void receive(@Payload String message,
@Headers MessageHeaders headers,
Acknowledgment acknowledgment)

确认会自动传入,还是我需要进行其他更改?

最佳答案

是的,这样一个 Acknowledgment 实例将被传递到您的监听器方法中。成功处理收到的消息后,您应该调用 acknowledgement.acknowledge(); (仅当您想手动确认时才需要)

我还会切换到MANUAL确认模式并关闭自动提交(您已经做了),例如通过提供自定义 Spring Boot 配置类 - 也可以通过 application.properties 进行配置:

@Configuration
class KafkaConfiguration {

@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

final Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
consumerProperties.put(ENABLE_AUTO_COMMIT_CONFIG, false);

ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(MANUAL);

return factory;
}
}
}

如果您不想手动确认,那么不同的确认模式可能更方便且更适合:

https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ContainerProperties.AckMode.html

AckMode.RECORD 非常舒服,因为如果监听器的方法实现成功完成(不抛出异常),则传递到监听器方法的 Kafka 记录将自动被确认。

关于java - 将确认传递给 spring KafkaListener 消费者方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59740441/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com