gpt4 book ai didi

java - 处理从 Kafka 检索到的每条记录后,正确的提交方式是什么?

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:43:47 25 4
gpt4 key购买 nike

我在理解如何为我使用的每条记录正确手动提交时遇到一些麻烦。

首先,让我们看一个来自 https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html 的例子

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}

此示例仅在处理完轮询中收到的所有记录后才提交。我认为这不是一个好方法,因为如果我们收到三个记录,而我的服务在处理第二个记录时终止,它最终会再次使用第一个记录,这是不正确的。

所以还有第二个例子,涵盖了在每个分区的基础上提交记录:

try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}

但是,我认为这也存在同样的问题,它仅在处理了来自特定分区的所有记录后才提交。

我设法想出的解决方案是这样的:

        val consumer: Consumer<String, MyEvent> = createConsumer(bootstrap)
consumer.subscribe(listOf("some-topic"))

while (true) {
val records: ConsumerRecords<String, MyEvent> = consumer.poll(Duration.ofSeconds(1))
if (!records.isEmpty) {
mainLogger.info("Received ${records.count()} events from CRS kafka topic, with partitions ${records.partitions()}")
records.forEach {
mainLogger.debug("Record at offset ${it.offset()}, ${it.value()}")
processEvent(it.value()) // Complex event processing occurs in this function
consumer.commitSync(mapOf(TopicPartition(it.topic(), it.partition()) to OffsetAndMetadata (it.offset() + 1)))
}
}
}

现在这似乎在我测试时起作用了。到目前为止,在我的测试过程中,似乎只使用了一个分区(我已经通过记录 records.partitions() 检查了这一点)。

这种方法会导致任何问题吗? Consumer API 似乎没有提供在不指定分区的情况下提交偏移量的方法,这对我来说似乎有点奇怪。我在这里遗漏了什么吗?

最佳答案

没有正确或错误的提交方式。这实际上取决于您的用例和应用程序。

提交每个偏移量可以提供更精细的控制,但它会影响性能。另一方面,您可以每隔 X 秒异步提交一次(就像自动提交那样)并且开销非常小,但控制却少得多。


在第一个示例中,事件是批量处理和提交的。这在性能方面很有趣,但如果出现错误,可以重新处理整个批处理。

在第二个示例中,它也是批处理,但仅针对每个分区。这应该会导致更小的批处理,从而降低性能,但在出现问题时减少重新处理。

在您的最后一个示例中,您选择提交每条消息。虽然这提供了最大的控制,但它会显着影响性能。此外,与其他情况一样,它也不是完全防错的。

如果应用程序在处理事件之后但在提交之前崩溃,则在重新启动时可能会重新处理最后一个事件(即至少一次语义)。但至少,只有一个事件会受到影响。

如果你想要exactly once语义,你需要使用Transactional Producer .

关于java - 处理从 Kafka 检索到的每条记录后,正确的提交方式是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57320831/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com