- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我正在尝试使用 robinhood/faust 但没有成功!
我已经创建了一个生产者,它可以成功地插入到我的 confluence-kafka 本地主机实例中的原始主题中!
但是 faust 无法连接到本地主机。
我的应用程序.py:
import faust
import base64
import random
from datetime import datetime
SOURCE_TOPIC="input_msgs"
TARGET_TOPIC="output_msgs"
app = faust.App("messages-stream",
broker="kafka://"+'localhost:9092',
topic_partitions=1,
store="memory://")
class OriginalMessage(faust.Record):
msg: str
class TransformedMessage(faust.Record):
msg_id: int
msg_data: str
msg_base64: str
created_at: float
source_topic: str
target_topic: str
deleted: bool
topic = app.topic(SOURCE_TOPIC, value_type=OriginalMessage)
out_topic = app.topic(TARGET_TOPIC, partitions=1)
table = app.Table(
"output_msgs",
default=TransformedMessage,
partitions=1,
changelog_topic=out_topic,
)
print('Initializing Thread Processor...')
@app.agent(topic)
async def transformedmessage(messageevents):
async for transfmessage in messageevents:
try:
table[transfmessage.msg_id] = random.randint(1, 999999)
table[transfmessage.msg_data] = transfmessage.msg
table[transfmessage.msg_base64] = base64.b64encode(transfmessage.msg)
table[transfmessage.created_at] = datetime.now().isoformat()
table[transfmessage.source_topic] = SOURCE_TOPIC
table[transfmessage.target_topic] = TARGET_TOPIC
table[transfmessage.deleted] = False
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
app.main()
错误
[2020-01-24 18:05:36,910] [55712] [ERROR] Unable connect to node with id 1: [Errno 8] nodename nor servname provided, or not known
[2020-01-24 18:05:36,910] [55712] [ERROR] [^Worker]: Error: ConnectionError('No connection to node with id 1')
"No connection to node with id {}".format(node_id))
kafka.errors.ConnectionError: ConnectionError: No connection to node with id 1
我正在运行:faust -A app worker -l debug
最佳答案
我遇到了这个错误,幸运的是我在某种程度上预料到了它,所以找出问题并不难。
在 Confluence 中,您必须配置用于访问为您引导的所有 Kafka 代理的域。我真的不知道该域名有多重要,所以我只是随机添加一些内容,直到我陷入困境。
当然,我和你一样被困在这里,所以我启动了 Wireshark 来看看 Faust 和引导服务器之间发生了什么。事实证明,引导对话是这样的:
..............faust-1.10.4..PLAIN <-- client name
................PLAIN <-- authentication protocol
....foobar.foobar.foobar2000. <-- credentials!
b0.svs.cluster.local..#.......b1.svs.cluster.local..# <-- individual Kafka brokers
这些遵循我选择的域名模式,并在 Confluence 文档中进行了描述:https://docs.confluent.io/current/installation/operator/co-endpoints.html
如果这些名称无法解析,您会在此处收到模糊错误,因为尽管引导成功,但 Kafka 客户端实际上无法连接到端点。因此,选择您实际控制的域,或者将您需要的答案放入本地 /etc/hosts
或等效文件中。
17 123.111.222.1 b0.svs.cluster.local
18 123.111.222.2 b1.svs.cluster.local
重启Faust后,bootstrap与Kafka连接成功。
关于python - 无法连接到 ID 为 1 的节点 : [Worker]: Error: ConnectionError ('No connection to node with id' ),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59903780/
我正在开发一个 voip 调用应用程序。我需要做的是在接到来电时将 Activity 带到前台。我在应用程序中使用 Twilio,并在收到推送消息时开始调用。 问题是我试图在接到任何电话时显示 Act
我是一名优秀的程序员,十分优秀!