gpt4 book ai didi

python-3.x - Python 3.6 ZeroMQ (PyZMQ) asyncio pub sub Hello World

转载 作者:行者123 更新时间:2023-12-03 17:34:36 25 4
gpt4 key购买 nike

我刚刚开始使用 ZeroMQ,我正在尝试让 Hello World 在 Python 3.6 中与 PyZMQ 和 asyncio 一起使用。我正在尝试将模块的功能与发布/订阅代码解耦,因此以下类设置:

编辑 1 : 最小化的例子

编辑 2 :包含的解决方案,请参阅下面的答案以了解如何。

import asyncio
import zmq.asyncio
from zmq.asyncio import Context

# manages message flow between publishers and subscribers
class HelloWorldMessage:
def __init__(self, url='127.0.0.1', port='5555'):
self.url = "tcp://{}:{}".format(url, port)
self.ctx = Context.instance()

# activate publishers / subscribers
asyncio.get_event_loop().run_until_complete(asyncio.wait([
self.pub_hello_world(),
self.sub_hello_world(),
]))

# generates message "Hello World" and publish to topic 'world'
async def pub_hello_world(self):
pub = self.ctx.socket(zmq.PUB)
pub.connect(self.url)

# message contents
msg = "Hello World"
print(msg)

# keep sending messages
while True:
# --MOVED-- slow down message publication
await asyncio.sleep(1)

# publish message to topic 'world'
# async always needs `send_multipart()`
await pub.send_multipart([b'world', msg.encode('ascii')]) # WRONG: bytes(msg)

# processes message "Hello World" from topic 'world'
async def sub_hello_world(self):
sub = self.ctx.socket(zmq.SUB)
sub.bind(self.url)
sub.setsockopt(zmq.SUBSCRIBE, b'world')

# keep listening to all published message on topic 'world'
while True:
msg = await sub.recv_multipart()
# ERROR: WAITS FOREVER
print('received: ', msg)

if __name__ == '__main__':
HelloWorldMessage()

问题

仅使用上述代码 1 Hello World被打印,然后永远等待。如果我按 ctrl+c,我会收到以下错误:
python helloworld_pubsub.py

Hello World
^CTraceback (most recent call last):
File "helloworld_pubsub_stackoverflow.py", line 64, in <module>
HelloWorldMessage()
File "helloworld_pubsub_stackoverflow.py", line 27, in __init__
self.sub_hello_world(),
File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 454, in run_until_complete
self.run_forever()
File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 421, in run_forever
self._run_once()
File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 1395, in _run_once
event_list = self._selector.select(timeout)
File "/*path*/zeromq/lib/python3.6/selectors.py", line 445, in select
fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt

版本: libzmq: 4.2.3 , pyzmq: 17.0.0 , Ubuntu 16.04
任何见解都值得赞赏。

最佳答案

我的代码有 2 个错误:

  • 正如@user3666197 所述, PUB/SUB 沟通原型(prototype)需要一些时间
    初始化(见他/她的回答)。我不得不搬家await asyncio.sleep(1)以上发布代码(await pub.send_multipart([b'world', msg.encode('ascii')]))
  • 我对消息进行了错误编码。 bytes(msg) --> msg.encode('ascii')

  • 这个答案与我的问题最密切相关,但是在实现 PyZMQ 时,请查看@user3666197 以了解某些设计选择。

    建议

    似乎 PyZMQ 在 asyncio.get_event_loop() 不会给出错误回溯,因此,将您的代码包装在 try 中& except block ,例如:
    import traceback
    import logging

    try:
    while True:
    msg_received = await sub.recv_multipart()
    # do other stuff

    except Exception as e:
    print("Error with sub world")
    logging.error(traceback.format_exc())

    关于python-3.x - Python 3.6 ZeroMQ (PyZMQ) asyncio pub sub Hello World,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49294156/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com