gpt4 book ai didi

apache-kafka - 未能构建 kafka 消费者

转载 作者:行者123 更新时间:2023-12-04 05:29:07 25 4
gpt4 key购买 nike

关于这个主题有很多答案,但没有任何效果。

我正在尝试执行以下流处理器。

object simplestream extends App {

val builder: KStreamBuilder = new KStreamBuilder

val streamingConfig = { //ToDo - Move these to config
val settings = new Properties
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "example11")
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
// Specify default (de)serializers for record keys and for record values.
settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray.getClass.getName)
settings
}

val users = builder.stream("tt2")

users.print()
val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)
stream.start()

}
}

依赖项:
   //kafka
"org.apache.kafka" % "kafka-streams" % "0.10.2.0",
"org.apache.kafka" % "kafka-clients" % "0.10.2.0"

和错误:
  [error] (run-main-1) org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566)
at org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getConsumer(DefaultKafkaClientSupplier.java:38)
at org.apache.kafka.streams.processor.internals.StreamThread.<init>(StreamThread.java:323)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:349)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:272)
at kafka.simplestream$.runStream(simplestream.scala:36)
at kafka.simplestream$.delayedEndpoint$kafka$simplestream$1(simplestream.scala:40)
at kafka.simplestream$delayedInit$body.apply(simplestream.scala:12)
at scala.Function0.apply$mcV$sp(Function0.scala:34)
at scala.Function0.apply$mcV$sp$(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App.$anonfun$main$1$adapted(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:378)
at scala.App.main(App.scala:76)
at scala.App.main$(App.scala:74)
at kafka.simplestream$.main(simplestream.scala:12)
at kafka.simplestream.main(simplestream.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.Metadata.update(Lorg/apache/kafka/common/Cluster;J)V

我试过不同的客户端版本,没有运气。我正在使用 kafka 0.10.2.0 版本。我也在zookeeper中遇到以下错误。
[2017-08-18 13:08:10,260] INFO Got user-level KeeperException when processing sessionid:0x15df53e101e0001 type:delete cxid:0x29 zxid:0x4d txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-08-18 13:08:10,364] INFO Got user-level KeeperException when processing sessionid:0x15df53e101e0001 type:create cxid:0x35 zxid:0x4e txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists for /brokers (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-08-18 13:08:10,364] INFO Got user-level KeeperException when processing sessionid:0x15df53e101e0001 type:create cxid:0x36 zxid:0x4f txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor)

不确定究竟是什么原因造成的。不过,我能够很好地消费/生产。

最佳答案

java.lang.NoSuchMethodError - 当您的类路径中存在多个版本的客户端 jar 时,会发生此错误。检查您的类路径一次。
KeeperException zookeeper 抛出的问题不是问题,它只是创建了 Zookeeper 中不存在的节点/文件夹。

关于apache-kafka - 未能构建 kafka 消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45756434/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com