gpt4 book ai didi

apache-kafka - KafkaStreams - InconsistentGroupProtocolException

转载 作者:行者123 更新时间:2023-12-04 18:02:36 24 4
gpt4 key购买 nike

我有一个 Kafka Streams 应用程序,它使用 Kafka Streams DSL 连接到我们的 Kafka 集群,如下所示:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, byte[]> stream = builder.stream(myTopic);

// do work

kStreams = new KafkaStreams(builder, config);
kStreams.start();

我的代码库的另一部分直接使用消费者客户端建立到我们的集群的连接。

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config, keyDeserializer, valueDeserializer);
consumer.subscribe(Collections.singletonList(sourceTopic));
consumer.poll(500L);
// etc
consumer.close();

我这样做的原因是在有条件地启动应用程序的其他部分(包括 Kafka Streams 拓扑)之前收集有关消费者组的元数据。可能还有其他方法可以做到这一点(例如,通过各种钩子(Hook)或其他方法),但我更好奇为什么这些方法的混合有时会(间歇性地)导致 InconsistentGroupProtocolException被抛出。

有人可以解释一下为什么要抛出这个吗?我很难从源代码本身确定到底发生了什么,但我猜 Kafka Streams 构建的底层消费者指定的分区协议(protocol)与 KafkaConsumer 不同。客户。无论如何,将不胜感激任何有助于理解此异常的帮助

最佳答案

你自己提出答案。 Kafka Streams 使用自定义分区分配器,Kafka Streams 客户端仅适用于其他 Kafka Streams 客户端。如果您使用 KafkaConsumer使用与您的 Kafka Streams 应用程序相同的组 ID,它将无法隔离 KafkaConsumer s 加入 Kafka Streams 消费者组。显然,KafkaConsumer无法与 Kafka Streams 一起“玩”。

关于apache-kafka - KafkaStreams - InconsistentGroupProtocolException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41601771/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com