gpt4 book ai didi

apache-kafka - 查找分配给 Kafka 流实例的分区

转载 作者:行者123 更新时间:2023-12-04 11:36:50 25 4
gpt4 key购买 nike

我有一个订阅许多主题的 Kafka 流应用程序,每个主题都有许多分区。
当我创建应用程序拓扑并启动它时,我是否知道将哪些主题的哪些分区分配给我的应用程序的当前实例?我想知道这与此实例是否处理任何记录无关。

我知道当我拿到记录时,我可以做到processorContext.partition()processorContext.topic()获取正在处理的当前记录的分区/主题信息。但我不是在寻找那个。

我正在寻找等效的 KafkaConsumer.assigment在 kafka 溪流一侧。

我也尝试了以下代码,但我得到 s 的大小为 0。

<Prepare builder and sconfig>
kafkaStream = new KafkaStreams (builder, sconfig);
kafkaStream.start ();
Collection<StreamsMetadata> s = kafkaStream.allMetadata();
System.out.println("StreamsMetadata: size is " + s.size());
for (StreamsMetadata m : s) {
Set<TopicPartition> tp = m.topicPartitions();
System.out.println ("TopicPartition: " + tp.toString());
}

最佳答案

更新答案(2020 年 11 月):

When I create the application topology and start it, do I know what partitions of what topics are assigned to current instance of my application?


如果我理解正确,您可以按如下方式执行此操作。在您的应用程序实例中,使用 KafkaStreams#localThreadsMetadata()获取 ThreadMetadata对于(该应用程序实例的)所有本地流线程。 ThreadMetadata包含 TaskMetadata用于该流线程上的所有事件和备用任务。 TaskMetadata有一个方法 topicPartitions()获取输入主题分区。
旧的、过时的答案:据我所知,Kafka Streams 中没有公开这些信息的现有 API。可以从 Kafka 消费者(由 Kafka Streams 使用)获取此信息,但它不会在 Kafka Streams 中公开。

关于apache-kafka - 查找分配给 Kafka 流实例的分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56229947/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com