gpt4 book ai didi

java - 为什么所有分区没有分配给正在运行的单个 Kafka Consumer?

转载 作者:行者123 更新时间:2023-11-30 05:28:38 25 4
gpt4 key购买 nike

我正在尝试读取主题“input_topic”中的最后 3 条记录。我只使用一个消费者。但它只消耗一个分区的记录。

当我手动分配其他分区时,出现错误“您只能检查分配给该消费者的分区的位置”。但我使用的是单个消费者。我无法理解这个问题。如果可能的话请帮助我。

Properties properties = new Properties();
properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,"4");
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
String topic = "input_topic";
TopicPartition topicPartition = new TopicPartition(topic, 0);
TopicPartition topicPartition1 = new TopicPartition(topic, 1);
TopicPartition topicPartition2 = new TopicPartition(topic, 2);
List<TopicPartition> topics = Arrays.asList(topicPartition1,topicPartition,topicPartition2);
while (true) {
Thread.sleep(5000);
consumer.assign(topics);
consumer.seekToEnd(topics);

long current = consumer.position(topicPartition);
consumer.seek(topicPartition, current-3);
ConsumerRecords<String, String> records = consumer.poll(100);
System.out.println("-------------------------------------------> "+ records.count());
System.out.println("-------------------------------------------> "+ LocalDateTime.now());
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
System.out.println("_________________________" + record.partition());
}
}

最佳答案

我的猜测...除了哈蒂斯所说的关于分配应该在循环之外进行一次之外,我从你的代码中看到了这一点。您查找所有主题分区末尾的位置,然后查找主题分区 0 的最后 3 条记录的偏移量。此时,轮询只能使用主题分区 0 中的这 3 条记录,而不能使用其他分区中的这 3 条记录,因为您在它们上的位置位于末尾(当然,如果您不向这些分区发送更多消息,则这是真的) )。

关于java - 为什么所有分区没有分配给正在运行的单个 Kafka Consumer?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58018767/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com