gpt4 book ai didi

java - Kafka消费者不消费

转载 作者:行者123 更新时间:2023-11-30 07:17:41 33 4
gpt4 key购买 nike

这是我从 java 客户端构建 kafka 消费者的代码。

  def buildConsumer[Key, Value](
configuration: KafkaConfiguration, commitInterval: Long, groupId: Option[String] = None)(
implicit keyDeserializer: Deserializer[Key], valueDeserializer: Deserializer[Value]
): KafkaJavaConsumer[Key, Value] = {
val settingsMap: Map[String, Object] = Map(
"bootstrap.servers" -> s"${configuration.bootstrapHost}:${configuration.bootstrapPort}",
"group.id" -> groupId.getOrElse(s"${configuration.topic}-${UUID.randomUUID}"),
"enable.auto.commit" -> "true",
"auto.commit.interval.ms" -> commitInterval.toString,
"auto.offset.reset" -> "earliest"
) ++ configuration.additionalOptions.getOrElse(Map.empty[String, Object])
val consumer = new KafkaJavaConsumer[Key, Value](settingsMap.asJava, keyDeserializer, valueDeserializer)
consumer.subscribe(Seq(configuration.topic).asJava)
consumer
}

我的 kafka 正在端口 6050 上运行,我已经在控制台中对其进行了测试,以从该特定端口进行生产和消费。我想知道我的问题是否与我上面的配置有关。我还使用 EmbeddedKafka 框架测试了上面的代码,问题似乎出在实际运行的 kafka 服务器上。

编辑:

我忘记补充一点,我有多个消费者(具有不同的 group.id's)从同一代理消费,不确定这是否是问题所在。

最佳答案

确保,

No. of partitions in the topic >= No. of consumer instances in the group

否则,组中的某些消费者实例将不会被分配任何分区。

要检查分区数量,请​​使用 kafka-topics.sh 命令

> sh kafka-topics.sh --zookeeper localhost:2181 --topic test --describe
主题:测试 PartitionCount:6 ReplicationFactor:1 配置:
主题:测试 分区:0 领导者:0 副本:0 Isr:0
主题:测试 分区:1 领导者:0 副本:0 Isr:0
主题:测试 分区:2 领导者:0 副本:0 Isr:0
主题:测试 分区:3 领导者:0 副本:0 Isr:0
主题:测试 分区:4 领导者:0 副本:0 Isr:0
主题:测试 分区:5 领导者:0 副本:0 Isr:0

关于java - Kafka消费者不消费,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38111628/

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com