gpt4 book ai didi

hadoop - 无法使用 kafka 命令行将 json tweets 事件发送到 Kafka 主题/生产者

转载 作者:可可西里 更新时间:2023-11-01 14:48:43 24 4
gpt4 key购买 nike

我创建了一个 python 脚本 raw_tweets_stream.py 以使用 twitter api 流式传输 twitter 数据。使用下面的脚本将来自 twitter 的 json 数据传递给 kafka 生产者。


`python raw_tweets_stream.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list localhost:2181 --topic raw_json_tweets`

raw_json_tweets 是为这些推文创建的 kafka 主题。 python 脚本 raw_tweets_stream.py 运行得很好,但在将它发送给 kafka 生产者时会抛出错误。我正在使用 Hortonworks HDP 2.3.1 沙箱,并且我已确保启动了 zookeeper 和 kafka。


/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic raw_json_tweets

Topic:raw_json_tweets      PartitionCount:1        ReplicationFactor:1     Configs:
Topic: raw_json_tweets Partition: 0 Leader: 0 Replicas: 0 Isr: 0

错误:

[2016-08-25 22:36:26,212] ERROR Failed to send requests for topics raw_json_tweets with correlation ids in [57,64] (kafka.producer.async.DefaultEventHandler)
[2016-08-25 22:36:26,213] ERROR Error in handling batch of 131 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:91)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2016-08-25 22:36:27,217] WARN Fetching topic metadata with correlation id 65 for topics [Set(json_tweets1)] from broker [BrokerEndPoint(0,localhost,2181)] failed (kafka.client.ClientUtils$)
java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
at kafka.utils.CoreUtils$.read(CoreUtils.scala:193)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:131)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:77)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
at kafka.producer.SyncProducer.send(SyncProducer.scala:115)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:188)
at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:152)
at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:151)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

更新:解决方案


  1. 转到 Ambari 服务并将 Kafka 日志目录更改为 /tmp/kafka-logs
  2. 修改了原始脚本以包含正确的端口和主机名。

    python raw_tweets_stream.py |/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list sandbox.hortonworks.com:6667 --topic raw_json_tweets

  3. 已使用控制台消费者验证事件已发送到 kafka 主题。

    /usr/hdp/2.3.0.0-2557/kafka/bin/kafka-console-consumer.sh -zookeeper sandbox.hortonworks.com:2181 -topic raw_json_tweets -from-beginning

最佳答案

看起来你正在将 --broker-list 指向 zookeeper (2181) 而不是你需要指向默认端口为 的 kafka broker >9092 或 Ambari 上的 6667

关于hadoop - 无法使用 kafka 命令行将 json tweets 事件发送到 Kafka 主题/生产者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39156065/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com