gpt4 book ai didi

apache-kafka - kafka 启动失败(版本 0.8.0 beta1 )

转载 作者:行者123 更新时间:2023-12-04 04:49:59 25 4
gpt4 key购买 nike

我正在尝试在独立模式(在 ec2 上)使用 zookeeper 版本(3.3.6)启动 kafka 服务。
所以我运行 1) sbt update 2) sbt package 3)sbt assembly-package-dependency 然后启动zookeeper服务,然后是kafka服务器。但是,我收到以下错误消息:
对于 kafka 服务器日志:

ERROR Error while electing or becoming leader on broker 0 (kafka.server.ZookeeperLeaderElector)
java.net.ConnectException: Connection timed out
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:84)
at kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:35)
at kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:35)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:81)
at kafka.controller.ControllerChannelManager.<init>(ControllerChannelManager.scala:35)
at kafka.controller.KafkaController.startChannelManager(KafkaController.scala:503)
at kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:467)
at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:215)
at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:89)
at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:53)
at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:106)
at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)

对于动物园管理员日志:
    2014-07-15 15:49:22,996 - INFO  [ProcessThread:-1:PrepRequestProcessor@419] - Got user-level KeeperException when processing sessionid:0x1473b82f52e0004 type:create cxid:0x57 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/brokers/topics/edwintest Error:KeeperErrorCode = NodeExists for /brokers/topics/edwintest
2014-07-15 15:49:23,102 - INFO [ProcessThread:-1:PrepRequestProcessor@419] - Got user-level KeeperException when processing sessionid:0x1473b82f52e0004 type:create cxid:0x59 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/brokers/topics/edwintest Error:KeeperErrorCode = NodeExists for /brokers/topics/edwintest
2014-07-15 15:49:23,109 - INFO [ProcessThread:-1:PrepRequestProcessor@419] - Got user-level KeeperException when processing sessionid:0x1473b82f52e0004 type:create cxid:0x5b zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/brokers/topics/edwintest Error:KeeperErrorCode = NodeExists for /brokers/topics/edwintest
2014-07-15 15:49:23,215 - INFO [ProcessThread:-1:PrepRequestProcessor@419] - Got user-level KeeperException when processing sessionid:0x1473b82f52e0004 type:create cxid:0x5d zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/brokers/topics/edwintest Error:KeeperErrorCode = NodeExists for /brokers/topics/edwintest

对于 kafka 生产者日志:
[2014-07-15 15:49:23,107] INFO Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 23 for 1 topic(s) Set(edwintest) (kafka.client.ClientUtils$)
[2014-07-15 15:49:23,107] INFO Connected to localhost:9092 for producing (kafka.producer.SyncProducer)
[2014-07-15 15:49:23,111] INFO Disconnecting from localhost:9092 (kafka.producer.SyncProducer)
[2014-07-15 15:49:23,111] WARN Error while fetching metadata [{TopicMetadata for topic edwintest ->
No partition metadata for topic edwintest due to kafka.common.LeaderNotAvailableException}] for topic [edwintest]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2014-07-15 15:49:23,112] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: edwintest (kafka.producer.async.DefaultEventHandler)
[2014-07-15 15:49:23,112] INFO Back off for 100 ms before retrying send. Remaining retries = 0 (kafka.producer.async.DefaultEventHandler)
[2014-07-15 15:49:23,213] INFO Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 24 for 1 topic(s) Set(edwintest) (kafka.client.ClientUtils$)
[2014-07-15 15:49:23,213] INFO Connected to localhost:9092 for producing (kafka.producer.SyncProducer)
[2014-07-15 15:49:23,217] INFO Disconnecting from localhost:9092 (kafka.producer.SyncProducer)
[2014-07-15 15:49:23,218] WARN Error while fetching metadata [{TopicMetadata for topic edwintest ->
No partition metadata for topic edwintest due to kafka.common.LeaderNotAvailableException}] for topic [edwintest]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2014-07-15 15:49:23,219] ERROR Failed to send requests for topics edwintest with correlation ids in [17,24] (kafka.producer.async.DefaultEventHandler)
[2014-07-15 15:49:23,219] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
at scala.collection.immutable.Stream.foreach(Stream.scala:254)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)

我的 /etc/hosts配置
127.0.0.1       ip-172-32-1-95 localhost.localdomain localhost
::1 localhost6.localdomain6 localhost6

我的 server.properties 文件
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The port the socket server listens on
port=9092

# Hostname the broker will bind to and advertise to producers and consumers.
# If not set, the server will bind to all interfaces and advertise the value returned from
# from java.net.InetAddress.getCanonicalHostName().
#host.name=localhost

# The number of threads handling network requests
num.network.threads=2

# The number of threads doing disk I/O
num.io.threads=2

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# The directory under which to store log files
log.dir=/tmp/kafka-logs

# The number of logical partitions per topic per server. More partitions allow greater parallelism
# for consumption, but also mean more files.
num.partitions=1

############################# Log Flush Policy #############################

# The following configurations control the flush of data to disk. This is the most
# important performance knob in kafka.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
# 3. Throughput: The flush is generally the most expensive operation.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
log.flush.interval.ms=1000

# Per-topic overrides for log.flush.interval.ms
#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=536870912

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.cleanup.interval.mins=1

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000

# metrics reporter properties
kafka.metrics.polling.interval.secs=5
kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
kafka.csv.metrics.dir=/tmp/kafka_metrics
# Disable csv reporting by default.
kafka.csv.metrics.reporter.enabled=false </code>

我的zookeeper配置zoo.cfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181

我试图删除/tmp/zookeeper或/tmp/kafka-logs下的kafka和zookeeper的所有信息,并重新启动所有内容,但仍然收到相同的错误。

最佳答案

凉爽的!我猜您正在运行 kafka-console-producer 来向主题“edwintest”发布消息。在运行您的生产者之前,使用此命令创建主题

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic edwintest

然后开始你的控制台制作人。希望这应该可以解决您的问题。

[编辑]
显然,您必须确保正确更新您的 ec2 安全组,以便为​​生产者打开 zk 和 kafka 代理端口。

关于apache-kafka - kafka 启动失败(版本 0.8.0 beta1 ),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24767337/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com