gpt4 book ai didi

java - 在kafka中消费批处理时如何部分提交同步

转载 作者:行者123 更新时间:2023-11-30 01:58:58 25 4
gpt4 key购买 nike

我们正在使用 kafka 进行批量消费。我们使用 X 消息并将它们放在 MYSQL 上然后提交它们。

我们有时会向 MYSQL 进行部分插入(重复记录、其他失败等)

使用文档中的示例:

List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}

我们希望仅提交同步成功的记录,同时让 kafka 重播失败的记录。

但我无法理解如何执行此操作,因为 api 在整个批处理上仅获得 commitSync()。

想法?

最佳答案

在 Kafka 中,您不会提交特定记录,即您无法将偏移量 N 标记为已处理,将偏移量 N-1 标记为未处理。通过提交偏移量 N,您可以表明您已经处理了最多 N 个记录。

处理偏移N失败时可以做的事情:

  • 提交 N-1 (使用 commitSync(java.util.Map<TopicPartition,OffsetAndMetadata> offsets) )并重试处理偏移量 N,因为它仍然在内存中。仅当 N 成功处理后,您才提交 N 并移至更新的记录。

  • 假设您在 Kafka Connect 中的 Sink Connector 中运行,在处理 N 失败时,您可以将记录转发到 Connect 的 Deal Letter 队列。否则将其推回另一个主题以供稍后处理。这会暂时跳过偏移量 N(如果可以的话,您也可以删除它)。

您也可以混合使用这两种方法,尝试几次重试,但如果无法处理此记录,请保存/删除它并继续处理较新的记录。

关于java - 在kafka中消费批处理时如何部分提交同步,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53478639/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com