gpt4 book ai didi

java - Kafka 消费者 - 分配和查找

转载 作者:行者123 更新时间:2023-12-02 02:29:43 25 4
gpt4 key购买 nike

我无法理解这个 API 设计!

在下面的代码中,我们订阅了具有动态分配分区的主题列表。这完全没问题。

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList("some-topic"));

while(true){

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
StreamSupport.stream(records.spliterator(), false)
.forEach(r -> {
System.out.println(r.key() + "::" + r.value());
});


}

这里很困惑。

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//seek for specific partition
TopicPartition partition = new TopicPartition("some-topic", 0);
consumer.assign(Arrays.asList(partition));
consumer.seek(partition, 0);

while(true){

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
StreamSupport.stream(records.spliterator(), false)
.forEach(r -> {
System.out.println(r.key() + "::" + r.value());
});


}

问题:

  1. 我们已经使用 assign 方法分配了分区列表。为什么 seek 方法还要查找分区信息?不知怎的,我觉得它是多余的
  2. seek 方法具有带有主题和偏移量的分区。为什么需要先进行分配才能调用seek

最佳答案

1 - 首先请记住,您的消费者可以通过 kafka API 分配给许多不同的主题/分区。然后,查找和分配有两个不同的独立职责,这就是为什么您可能认为它是多余的,但是每当您需要返回偏移量,或者出于任何原因需要重做偏移量时,您都会使用查找,为此,seek()需要主题和分区信息,您使用静态分配(分配)或动态(订阅)。

你不能只使用seek()而不指定主题)/分区,这在很多情况下会产生歧义。

2 - 您确定在调用seek()之前需要进行赋值吗?我知道两者都可以在调用 poll() 之前使用..,但不知道在eek() 之前分配是强制性的..,您是否有错误消息(我明天可能会检查并编辑这篇文章)

雅尼克

关于java - Kafka 消费者 - 分配和查找,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57236899/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com