gpt4 book ai didi

java - 重新消费未提交偏移量的消息

转载 作者:行者123 更新时间:2023-12-02 02:04:17 24 4
gpt4 key购买 nike

我有一个自定义的 Kafka Consumer,我用它来向 REST API 发送一些请求。根据 API 的响应,我要么提交偏移量,要么跳过消息而不提交。

最小示例:

while (true) {

ConsumerRecords<String, Object> records = consumer.poll(200);
for (ConsumerRecord<String, Object> record : records) {

// Sending a POST request and retrieving the answer
// ...

if (responseCode.startsWith("2")) {
try {
consumer.commitSync();
} catch(CommitFailedException ex) {
ex.printStackTrace();
}
} else {
// Do Nothing
}
}
}

现在,当 REST API 的响应不以 2 开头时,偏移量不会提交,但消息不会重新使用。如何强制消费者重新使用未提交偏移量的消息?

最佳答案

如果您打算使用seek(),请确保您的数据是幂等的。由于您有选择地提交偏移量,因此遗漏的记录可能会在提交(成功处理)的记录之前。如果您执行seek() - 它将您的groupId的指针移动到未提交的偏移量并开始重播,您也将获得那些成功处理的消息。它还有可能成为无限循环。

或者,您可以将不成功记录的元数据保存在内存或数据库中,并从头开始重播主题“poll(retention.ms)”,以便重播所有记录,但添加一个过滤器,仅通过 API 处理元数据与您之前保存的内容。每小时或几个小时执行一次批处理。

关于java - 重新消费未提交偏移量的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51040666/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com