gpt4 book ai didi

python - 无法使用kafka-python从另一个容器向Kafka容器发出请求

转载 作者:行者123 更新时间:2023-12-02 19:03:21 32 4
gpt4 key购买 nike

环保:

services:  
zookeeper:
image: wurstmeister/zookeeper
ports:
- 2181
kafka:
image: wurstmeister/kafka
ports:
- 9092:9092
#- 8004:8004
links:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_PORT: 9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_CREATE_TOPICS: "foo:10:1"
# JMX_PORT: 8004
clickhouse-01:
image: yandex/clickhouse-server
hostname: clickhouse-01
container_name: clickhouse-01
ports:
- 9001:9000
volumes:
- ./config/config.xml:/etc/clickhouse-server/config.xml
- ./config/metrika.xml:/etc/clickhouse-server/metrika.xml
- ./config/macros/macros-01.xml:/etc/clickhouse-server/config.d/macros.xml

ulimits:
nofile:
soft: 262144
hard: 262144
depends_on:
- "zookeeper"

clickhouse-02:
image: yandex/clickhouse-server
hostname: clickhouse-02
container_name: clickhouse-02
ports:
- 9002:9000
volumes:
- ./config/config.xml:/etc/clickhouse-server/config.xml
- ./config/metrika.xml:/etc/clickhouse-server/metrika.xml
- ./config/macros/macros-02.xml:/etc/clickhouse-server/config.d/macros.xml

ulimits:
nofile:
soft: 262144
hard: 262144
depends_on:
- "zookeeper"

clickhouse-03:
image: yandex/clickhouse-server
hostname: clickhouse-03
container_name: clickhouse-03
ports:
- 9003:9000
volumes:
- ./config/config.xml:/etc/clickhouse-server/config.xml
- ./config/metrika.xml:/etc/clickhouse-server/metrika.xml
- ./config/macros/macros-03.xml:/etc/clickhouse-server/config.d/macros.xml

ulimits:
nofile:
soft: 262144
hard: 262144
depends_on:
- "zookeeper"

通过Zookeeper容器查询Kafka:
bash-4.4# /opt/kafka/bin/kafka-topics.sh --list --zookeeper zookeeper:2181
__consumer_offsets
foo
raw_trap

Netstat来自Zookeeper容器中的结果:
root@0a5f9a441da3:/opt/zookeeper-3.4.13# netstat
Active Internet connections (w/o servers)
Proto Recv-Q Send-Q Local Address Foreign Address State
tcp 0 0 0a5f9a441da3:2181 kafka_1:58622 ESTABLISHED
tcp 0 0 0a5f9a441da3:2181 clickhouse-02.cli:60728 ESTABLISHED
tcp 0 0 0a5f9a441da3:2181 clickhouse-01.cli:56448 ESTABLISHED
tcp 0 0 0a5f9a441da3:2181 clickhouse-03.cli:39656 ESTABLISHED

从运行kafka-python的容器到代理的Telnet:
root@f10fe1b58fa9:~# telnet kafka 9092
Trying 172.18.0.8...
Connected to kafka.
Escape character is '^]'.

telnet的Kafka错误:
kafka_1           | [2019-06-23 13:38:05,350] WARN [SocketServer brokerId=1019] Unexpected error from /172.18.0.5; closing connection (org.apache.kafka.common.network.Selector)
kafka_1 | org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1903520116 larger than 104857600)
kafka_1 | at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:104)
kafka_1 | at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
kafka_1 | at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
kafka_1 | at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
kafka_1 | at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
kafka_1 | at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
kafka_1 | at kafka.network.Processor.poll(SocketServer.scala:830)
kafka_1 | at kafka.network.Processor.run(SocketServer.scala:730)
kafka_1 | at java.lang.Thread.run(Thread.java:748)

尝试使用python将数据发送到kafka主题时出错:
>>> from kafka import KafkaProducer
>>> producer = KafkaProducer(bootstrap_servers=['kafka:9092'])
>>> producer
<kafka.producer.kafka.KafkaProducer object at 0x7ff84417b320>
>>> producer.send('foo', b'raw_bytes')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python3.7/site-packages/kafka/producer/kafka.py", line 564, in send
self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
File "/usr/local/lib/python3.7/site-packages/kafka/producer/kafka.py", line 691, in _wait_on_metadata
"Failed to update metadata after %.1f secs." % (max_wait,))
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

我已经在网上徘徊了好几次,试图找到一个解决方案。我首先要确保容器中的 KAFKA_ADVERTISED_HOST_NAME是正确的,然后尝试对其进行更改,但没有结果。更改 bootstrap_servers=['kafka:9092']条目的端点时,出现错误:
>>> consumer = KafkaConsumer('foo', 
... group_id='test-group',
... bootstrap_servers=['localhost:9092'])
Traceback (most recent call last):
File "<stdin>", line 3, in <module>
File "/usr/local/lib/python3.7/site-packages/kafka/consumer/group.py", line 353, in __init__
self._client = KafkaClient(metrics=self._metrics, **self.config)
File "/usr/local/lib/python3.7/site-packages/kafka/client_async.py", line 239, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/usr/local/lib/python3.7/site-packages/kafka/client_async.py", line 865, in check_version
raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable

因此,看来我可能正在建立连接,但可能会从根本上误解我要与生产者进行的请求。

这是我正在测试的python库的文档和示例 https://kafka-python.readthedocs.io/en/master/usage.html#kafkaconsumer

编辑:我已经从使用消费者在裸机上运行的生产kafka环境中成功返回了消息。

最佳答案

引用罗宾·莫菲(Robin Moffet)关于Kafka听众和 docker 的精彩博客文章:

如果使用的是docker,则需要将KAFKA_ADVERTISED_LISTENERS设置为外部地址(主机或IP),以便客户端可以正确连接到该地址。否则,他们将尝试连接到内部主机地址,如果无法访问该主机地址,则会出现问题。

https://rmoff.net/2018/08/02/kafka-listeners-explained/

Kafka客户端连接实际上是一个两步过程,涉及到首先连接到引导服务器以请求有关整个群集的元数据,然后使用广告的侦听器名称和端口连接到一个或多个群集节点。

关于python - 无法使用kafka-python从另一个容器向Kafka容器发出请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56724662/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com