gpt4 book ai didi

python - 无法连接到 ID 为 1 的节点 : [Worker]: Error: ConnectionError ('No connection to node with id' )

转载 作者:太空宇宙 更新时间:2023-11-03 19:50:01 25 4
gpt4 key购买 nike

我正在尝试使用 robinhood/faust 但没有成功!

我已经创建了一个生产者,它可以成功地插入到我的 confluence-kafka 本地主机实例中的原始主题中!

但是 faust 无法连接到本地主机。

我的应用程序.py:

import faust
import base64
import random
from datetime import datetime


SOURCE_TOPIC="input_msgs"
TARGET_TOPIC="output_msgs"

app = faust.App("messages-stream",
broker="kafka://"+'localhost:9092',
topic_partitions=1,
store="memory://")

class OriginalMessage(faust.Record):
msg: str


class TransformedMessage(faust.Record):
msg_id: int
msg_data: str
msg_base64: str
created_at: float
source_topic: str
target_topic: str
deleted: bool

topic = app.topic(SOURCE_TOPIC, value_type=OriginalMessage)
out_topic = app.topic(TARGET_TOPIC, partitions=1)

table = app.Table(
"output_msgs",
default=TransformedMessage,
partitions=1,
changelog_topic=out_topic,
)

print('Initializing Thread Processor...')


@app.agent(topic)
async def transformedmessage(messageevents):
async for transfmessage in messageevents:
try:

table[transfmessage.msg_id] = random.randint(1, 999999)
table[transfmessage.msg_data] = transfmessage.msg
table[transfmessage.msg_base64] = base64.b64encode(transfmessage.msg)
table[transfmessage.created_at] = datetime.now().isoformat()
table[transfmessage.source_topic] = SOURCE_TOPIC
table[transfmessage.target_topic] = TARGET_TOPIC
table[transfmessage.deleted] = False

except Exception as e:
print(f"Error: {e}")


if __name__ == "__main__":
app.main()

错误

[2020-01-24 18:05:36,910] [55712] [ERROR] Unable connect to node with id 1: [Errno 8] nodename nor servname provided, or not known 
[2020-01-24 18:05:36,910] [55712] [ERROR] [^Worker]: Error: ConnectionError('No connection to node with id 1')

"No connection to node with id {}".format(node_id))
kafka.errors.ConnectionError: ConnectionError: No connection to node with id 1

我正在运行:faust -A app worker -l debug

最佳答案

我遇到了这个错误,幸运的是我在某种程度上预料到了它,所以找出问题并不难。

在 Confluence 中,您必须配置用于访问为您引导的所有 Kafka 代理的域。我真的不知道该域名有多重要,所以我只是随机添加一些内容,直到我陷入困境。

当然,我和你一样被困在这里,所以我启动了 Wireshark 来看看 Faust 和引导服务器之间发生了什么。事实证明,引导对话是这样的:

..............faust-1.10.4..PLAIN <-- client name
................PLAIN <-- authentication protocol
....foobar.foobar.foobar2000. <-- credentials!
b0.svs.cluster.local..#.......b1.svs.cluster.local..# <-- individual Kafka brokers

这些遵循我选择的域名模式,并在 Confluence 文档中进行了描述:https://docs.confluent.io/current/installation/operator/co-endpoints.html

如果这些名称无法解析,您会在此处收到模糊错误,因为尽管引导成功,但 Kafka 客户端实际上无法连接到端点。因此,选择您实际控制的域,或者将您需要的答案放入本地 /etc/hosts 或等效文件中。

17  123.111.222.1 b0.svs.cluster.local
18 123.111.222.2 b1.svs.cluster.local

重启Faust后,bootstrap与Kafka连接成功。

关于python - 无法连接到 ID 为 1 的节点 : [Worker]: Error: ConnectionError ('No connection to node with id' ),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59903780/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com