gpt4 book ai didi

java - 如何在 spring-kafka 中的每条 kafka 消息后提交?

转载 作者:行者123 更新时间:2023-11-30 09:59:50 24 4
gpt4 key购买 nike

在我们的应用程序中,我们使用 kafka 消费者来确定发送电子邮件。

前几天我们遇到了一个问题,kafka 分区在读取和处理其所有记录之前超时。结果,它循环回到分区的开始,无法完成它收到的记录集,并且循环开始后生成的新数据从未得到处理。

我的团队建议我们可以在读取每条消息后告诉 Kafka 提交,但是我不知道如何从 Spring-kakfa 做到这一点。

应用程序使用 spring-kafka 2.1.6,消费者代码有点像这样。

@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.groupId}")
public void consume(String message, @Header("kafka_offset") int offSet) {
try{
EmailData data = objectMapper.readValue(message, EmailData.class);
if(isEligableForEmail(data)){
emailHandler.sendEmail(data)
}
} catch (Exception e) {
log.error("Error: "+e.getMessage(), e);
}
}

注意:sendEmail 函数使用 CompletableFutures,因为它必须在发送电子邮件之前调用不同的 API。

配置:(消费者的 yaml 文件片段,以及生产者的一部分)

consumer:
max.poll.interval.ms: 3600000
producer:
retries: 0
batch-size: 100000
acks: 0
buffer-memory: 33554432
request.timeout.ms: 60000
linger.ms: 10
max.block.ms: 5000

最佳答案

如果你想要手动确认,那么你可以在方法参数中提供确认

@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.groupId}")
public void consume(String message, Acknowledgment ack, @Header("kafka_offset") int offSet) {

When using manual AckMode, you can also provide the listener with the Acknowledgment. The following example also shows how to use a different container factory.

来自文档的示例 @KafkaListener Annotation

@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();

}

关于java - 如何在 spring-kafka 中的每条 kafka 消息后提交?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58566465/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com