gpt4 book ai didi

java - 使用Java Kafka Consumer不进行commit操作是否正确?

转载 作者:行者123 更新时间:2023-12-02 10:07:12 25 4
gpt4 key购买 nike

我需要读取从开始偏移到结束偏移的记录集。为此,我使用专用的 Kafka 消费者。我可以接受至少一次语义(以防万一,如果给定的应用程序实例出现故障,并且新的应用程序实例从该起始偏移量重新读取记录)。

那么我可以使用这样的代码吗?

private static KafkaConsumer<Long, String> createConsumer() {

final Properties props = new Properties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

return new KafkaConsumer<>(props);
}

public void process() {

KafkaConsumer consumer = createConsumer();
TopicPartition topicPartition = new TopicPartition("topic", 2);
consumer.assign(List.of(topicPartition));

long startOffset = 42;
long endOffset = 100;

consumer.seek(topicPartition, startOffset);

boolean isRunning = true;
while (isRunning) {
final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000);

for (ConsumerRecord<Long, String> record : consumerRecords) {
if (record.offset() >= endOffset) {
isRunning = false;
break;
}
}
}

consumer.close();
}

所以:

  • 我没有commit()
  • 我禁用自动提交
  • 我没有group-id

代码正确吗?还是有什么隐藏的问题?

最佳答案

是的,这是正确的用法,您不应该遇到任何问题。这不是 Kafka 消费者的典型用法,但这是允许的。

来自官方 KafkaConsumer javadoc(我的亮点):

https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

Controlling The Consumer's Position

In most use cases the consumer will simply consume records from beginning to end, periodically committing its position (either automatically or manually). However Kafka allows the consumer to manually control its position, moving forward or backwards in a partition at will. This means a consumer can re-consume older records, or skip to the most recent records without actually consuming the intermediate records. There are several instances where manually controlling the consumer's position can be useful.

...

Kafka allows specifying the position using seek(TopicPartition, long) to specify the new position. Special methods for seeking to the earliest and latest offset the server maintains are also available ( seekToBeginning(Collection) and seekToEnd(Collection) respectively).

关于java - 使用Java Kafka Consumer不进行commit操作是否正确?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55267019/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com