gpt4 book ai didi

python - pyzmq 发布者可以从类实例进行操作吗?

转载 作者:太空宇宙 更新时间:2023-11-03 15:58:51 26 4
gpt4 key购买 nike

我有以下发布者代码,它实例化一些类实例并发布一些消息。

但是,我在订阅者端没有收到任何内容。

发布商

import zmq
import time
from multiprocessing import Process

class SendData:
def __init__(self, msg, port):
self.msg = msg
self.port = port
ctx = zmq.Context()
self.sock = ctx.socket(zmq.PUB)
self.sock.bind('tcp://127.0.0.1:'+str(self.port))
time.sleep(1)

def sender(self):
self.sock.send_json(self.msg)

def main():
for device, port in zip(['2.2.2.2', '5.5.5.5'],[5001, 5002]):
msg = {device:'Some random message'}
instance = SendData(device, port)
Process(target=instance.sender).start()

if __name__ == "__main__":
main()

订阅者

import zmq

ctx = zmq.Context()
recv_sock1 = ctx.socket(zmq.SUB)
recv_sock1.connect('tcp://127.0.0.1:5001')
recv_sock1.setsockopt(zmq.SUBSCRIBE, '')

recv_sock2 = ctx.socket(zmq.SUB)
recv_sock2.connect('tcp://127.0.0.1:5002')
recv_sock2.setsockopt(zmq.SUBSCRIBE, '')

while True:
if recv_sock1.poll(10):
msg = recv_sock1.recv_json()
print msg

if recv_sock2.poll(10):
msg = recv_sock2.recv_json()
print msg

在发布商发布任何内容之前,我就已经开始订阅了。另外,我可以看到 TCP 连接处于已建立状态,因此已建立连接。

  • pyzmq 版本 16.0.0
  • Python 版本 2.7

问题1:类实例是否支持 0mq 发布者?
问题2:我错过了什么?

最佳答案

正如之前所说,尝试在进程之间共享 ZeroMQ 上下文是这里的问题,user3666197 的解决方案将起作用。但是,我建议在这种情况下子类化 multiprocessing.Process 。这样,代码的哪一部分在哪个进程中执行就更清楚了。它还使您的代码更具可读性和可重用性。

以下代码为每个设备创建一个发送方进程。发送者进程可以在程序运行时重用以发送更多数据:

import multiprocessing
import queue
import zmq
import time

class Sender(multiprocessing.Process):

def __init__(self, port):
super(Sender, self).__init__()
self._port = port
self._messages = multiprocessing.Queue()
self._do_stop = multiprocesing.Event()

def run(self):
"""
This code is executed in a new process.
"""
ctx = zmq.Context()
sock = ctx.socket(zmq.PUB)
sock.bind("tcp://127.0.0.1:" + str(self._port))
while not self._do_stop.is_set():
try:
msg = self._message.get_nowait()
sock.send_json(msg)
except queue.Empty:
time.sleep(0.01)

def stop(self):
self._do_stop.set()

def send_message(self, msg):
self._messages.put(msg)

def main():
data = zip(['2.2.2.2', '5.5.5.5'], [5001, 5002])
# create senders
senders = {device: Sender(port) for device, port in data}
# start senders
for device in senders:
senders[device].start()
# send messages
for device, port in zip(['2.2.2.2', '5.5.5.5'],[5001, 5002]):
msg = {device: 'Some random message'}
senders[device].send_message(msg)
# do more stuff here....
# ... e.g. send more messages
# ...
# once you are finished, stop the subprocesses
for device in senders:
senders[device].stop()

希望这有助于解决您的问题。

关于python - pyzmq 发布者可以从类实例进行操作吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40521060/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com