gpt4 book ai didi

java - 批量插入成功后更新 Kafka 提交偏移量

转载 作者:行者123 更新时间:2023-11-30 07:42:37 27 4
gpt4 key购买 nike

我有一个 spring-kafka 消费者,它读取记录并将它们交给缓存。定时任务会定期清除缓存中的记录。我只想在批处理成功保存到数据库后更新 COMMIT OFFSET。我尝试将确认对象传递给缓存服务以调用确认方法,如下所示。

public class KafkaConsumer {
@KafkaListener( topicPattern = "${kafka.topicpattern}", containerFactory = "kafkaListenerContainerFactory" )
public void receive( ConsumerRecord<String, String> record, Acknowledgment acknowledgment ) {
cacheService.add( record.value(), acknowledgment );
}
}

public class CacheService {
// concurrency handling has been left out in favor of readability
public void add( String record, Acknowledgment acknowledgment ) {
this.records.add(record);
this.lastAcknowledgment = acknowledgment;
}

public void saveBatch() { //called by scheduled task
if( records.size() == BATCH_SIZE ) {
// perform batch insert into database
this.lastAcknowledgment.acknowledge();
this.records.clear();
}
}
}

AckMode 设置如下:

factory.getContainerProperties().setAckMode( AbstractMessageListenerContainer.AckMode.MANUAL );

并且自动提交是错误的:

config.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );

即使调用了 acknowledge 方法,提交偏移量也不会更新。持久化记录后更新提交偏移量的最佳方法是什么?

我正在使用 spring-kafka 2.1.7.RELEASE。


编辑:在@GaryRussell confirmed 之后外部线程做出的确认由消费者线程在下一次轮询期间执行,我重新检查了我的代码并发现了如何设置最后一个确认对象的错误。 修复此问题后,提交偏移量按预期更新。所以这个问题已经解决了。但是,我无法将此问题标记为已回答。

最佳答案

问题来了,Consumer线程负责commit offset。在轮询时,消费者线程将提交前一个批处理的偏移量。

因为在您的情况下 AUTO_COMMIT 为假并且 lastAcknowledgment.acknowledge() 未确认未提交偏移量。

只有一种方法可以做到这一点,一旦获得轮询记录,就将 Schedule 任务作为 Async 并保持消费者线程并在 Async 完成后提交偏移量任务,查看此答案以供引用 answer

注意 如果您持有消费者线程超过 5 分钟,将发生重新平衡 here

he new Java Consumer now supports heartbeating from a background thread. There is a new configuration max.poll.interval.ms which controls the maximum time between poll invocations before the consumer will proactively leave the group (5 minutes by default). The value of the configuration request.timeout.ms must always be larger than max.poll.interval.ms because this is the maximum time that a JoinGroup request can block on the server while the consumer is rebalancing, so we have changed its default value to just above 5 minutes. Finally, the default value of session.timeout.ms has been adjusted down to 10 seconds, and the default value of max.poll.records has been changed to 500.

特别说明 来自spring kafka >2.1.5

外部线程上的确认将在下一次轮询之前由消费者线程执行感谢@Gary Russell 提供此信息

关于java - 批量插入成功后更新 Kafka 提交偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54383839/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com