gpt4 book ai didi

java - 卡夫卡与Java : how to re-read data

转载 作者:行者123 更新时间:2023-12-02 10:27:02 25 4
gpt4 key购买 nike

我在使用 kafka API 时遇到以下问题。我设置了我的消费者:

props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, configuration.batchSize);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

然后

while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
try {
//do some update in DB in a transaction
consumer.commitSync();
} catch (Exception e) {
}

我想从 Kafka 读取数据,并根据这些数据更新数据库。但如果更新失败,我想重试,直到成功为止。所以我想将数据库事务应用到kafka,i.a.如果我的数据库事务正常,则移动 kafka 指针,但如果失败,则从同一位置重试。

在我的代码中,

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(

没有按预期工作,这意味着“如果kafka崩溃,则从提交的位置重新启动”。但是当我的数据库事务失败时,即使我不 commitAsync() 指针也会向前移动。

我的问题是:有没有一种简单的方法可以将kafka指针位置反转到上次轮询的位置。

我已经注意到 API 中有这个

public void seek(TopicPartition partition,
long offset);

但这需要手动维护分区列表及其偏移量,我想有更简单、更优雅的东西吗?

最佳答案

1) 由于 Consumer.poll 位于循环内,因此无论您是否提交偏移量,您都将继续使用偏移量前进。仅当您重新启动组件时,提交才会派上用场。即知道消费者应该从哪个位置开始消费。

2)如果数据库事务失败时需要移动到之前提交的偏移量,请使用Kafka Consumer中的seek方法。 公共(public)无效寻求(TopicPartition分区,长偏移)

3)为了提交各个分区的偏移量,您将需要按照您提到的那样维护每个分区的偏移量。我认为没有其他办法。

您可能不需要在每次数据库事务失败时寻找先前提交的偏移量。您可能想要暂停消费者并重试几次,从而以指数方式增加等待时间。

但是要回答关于如何在每次轮询中移动到上一个偏移量的问题,请跟踪每个分区中第一条消息的偏移量,并且在失败的情况下,在循环结束时寻找您跟踪的偏移量。

关于java - 卡夫卡与Java : how to re-read data,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53872484/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com