gpt4 book ai didi

apache-kafka - 确定哪个主题在 Kafka 消费者中有记录

转载 作者:行者123 更新时间:2023-12-04 10:50:36 25 4
gpt4 key购买 nike

我一直在关注 Kafka 消费者类(class)。我可以将主题作为列表对象传递。我指的是以下文章https://docs.confluent.io/current/clients/java.html ,但是我需要知道一旦消费者类订阅了主题,我怎么知道哪个主题中有记录。有什么办法可以查到吗?这是代码:

public abstract class ConsumeLoop implements Runnable {
private final KafkaConsumer<K, V> consumer;
private final List<String> topics;
private final CountDownLatch shutdownLatch;

public BasicConsumeLoop(KafkaConsumer<K, V> consumer, List<String> topics) {
this.consumer = consumer;
this.topics = topics;
this.shutdownLatch = new CountDownLatch(1);
}

public abstract void process(ConsumerRecord<K, V> record);

public void run() {
try {
consumer.subscribe(topics); --> Consuming list of topics

while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE); --> Which topic is returning the records?
records.forEach(record -> process(record));
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
}

最佳答案

partitions()方法来自 ConsumerRecords将返回一组 TopicPartition s:

partitions - Get the partitions which have records contained in this record set.



然后您可以迭代该集合以获得 topic()姓名和 partition()数字,取决于您的需要。例如:
   for (TopicPartition tp : records.partitions()) {
System.out.println("Got " + records.records(tp).size() + " records "
+ "from topic:partition " + tp.topic() + ":" + tp.partition());
}

关于apache-kafka - 确定哪个主题在 Kafka 消费者中有记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59490848/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com