gpt4 book ai didi

kotlin - 使用自定义(ConsumerAware)错误处理程序时如何查找和提交?

转载 作者:行者123 更新时间:2023-12-03 08:20:16 26 4
gpt4 key购买 nike

情况

我正在尝试为Spring-Kafka(2.3.3.RELEASE)批处理消息侦听器实现自定义错误处理程序。我有裸露的骨骼配置:

val factory = ConcurrentKafkaListenerContainerFactory<Int, String>()

factory.consumerFactory = kafkaConsumerFactory
factory.isBatchListener = true
factory.setBatchErrorHandler(MyErrorHandler())

无需任何额外的使用者配置。请注意,“从2.3版开始,框架将enable.auto.commit设置为false”。

我正在使用“了解消费者”的 @KafkaListener,而我的自定义错误处理程序也实现了 ConsumerAwareBatchErrorHandler

在我的错误处理程序中,我使用 Consumer.seek(topicPartition, offset)方法来寻找下一个或寻找当前的偏移量。

问题在于,每当调用错误处理程序并且我手动寻求某个偏移量时,该偏移量似乎都不会提交给Kafka。通过以下事实证明了这一点:在应用程序重新启动后,它轮询相同的(以前失败的)记录(即使代码试图查找下一个)。
更令我困惑的是,在应用程序运行时,它似乎确实使用了 seek()的偏移量并轮询新记录。

我尝试使用其他 AckMode,例如 MANUALMANUAL_IMMIDIATE,但这不会更改观察到的行为。此外,文档还说明了这些模式:

MANUAL, and MANUAL_IMMEDIATE require the listener to be an AcknowledgingMessageListener or a BatchAcknowledgingMessageListener



显然我没有使用。

如果已经找到一个“修复程序”,即在 consumer.commitSync()之后调用 .seek(),它似乎立即提交了偏移量。这也由日志记录备份:
DEBUG 13784 --- [ntainer#2-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=kafka-retry-test] Committed offset 28 for partition test-retry-batch-custom-e-0
问题

为什么只调用 consumer.seek(...)不会向kafka提交偏移量,我是否缺少一些关键配置?调用 .commitSync()似乎是一种技巧,文档中没有描述它在任何地方都不需要。

最佳答案

搜索不提交偏移量,它只是将当前使用者定位在该点上。您需要调用syncCommitasyncCommit

但是,从versionn 2.3.2开始(当前版本为2.3.4),您可以从isAckAfterHandle()返回true:

/**
* Return true if the offset should be committed for a handled error (no exception
* thrown).
* @return true to commit.
* @since 2.3.2
*/
default boolean isAckAfterHandle() {
// TODO: Default true in the next release.
return false;
}

容器将为您完成提交。

参见 the documentation

Starting with version 2.3.2, these interfaces have a default method isAckAfterHandle() which is called by the container to determine whether the offset(s) should be committed if the error handler returns without throwing an exception. This returns false by default, for backwards compatibility. In most cases, however, we expect that the offset should be committed. For example, the SeekToCurrentErrorHandler returns true if a record is recovered (after any retries, if so configured). In a future release, we expect to change this default to true.



但是,如果您使用手动确认,则错误处理程序必须手动提交。

关于kotlin - 使用自定义(ConsumerAware)错误处理程序时如何查找和提交?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59214040/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com