gpt4 book ai didi

zeromq - ZMQ/0MQ如何实现多个发布者和订阅者?

转载 作者:行者123 更新时间:2023-12-05 04:06:35 31 4
gpt4 key购买 nike

如何创建允许多个发布者和这些发布者的多个订阅者的网络?

还是绝对有必要使用消息代理?

import time
import zmq
from multiprocessing import Process

def bind_pub(sleep_seconds, max_messages, pub_id):
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5556")

message = 0
while True:
socket.send_string("1 sending_func=bind_pub message_number=%s pub_id=%s" % (message, pub_id))
message += 1
if message >= max_messages:
break
time.sleep(sleep_seconds)

def bind_sub(sleep_seconds, max_messages, sub_id):
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.bind("tcp://*:5556")
socket.setsockopt_string(zmq.SUBSCRIBE, '1')

message_n = 0
while True:
message = socket.recv_string()
print(message + " receiving_func=bind_sub sub_id=%s" % sub_id)
message_n += 1
if message_n >= max_messages - 1:
break
time.sleep(sleep_seconds)

def conect_pub(sleep_seconds, max_messages, pub_id):
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.connect("tcp://localhost:5556")

message = 0
while True:
socket.send_string("1 sending_func=conect_pub message_number=%s pub_id=%s" % (message, pub_id))
message += 1
if message >= max_messages:
break
time.sleep(sleep_seconds)

def connect_sub(sleep_seconds, max_messages, sub_id):
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5556")
socket.setsockopt_string(zmq.SUBSCRIBE, '1')

message_n = 0
while True:
message = socket.recv_string()
print(message + " receiving_func=connect_sub sub_id=%s" % sub_id)
message_n += 1
if message_n >= max_messages - 1:
break
time.sleep(sleep_seconds)

尝试使用 bind_pub、connect_pub、connect_sub、connect_sub 网络架构时:

# bind_pub, connect_pub, connect_sub, connect_sub
n_messages = 4
p1 = Process(target=bind_pub, args=(1,n_messages,1))
p2 = Process(target=conect_pub, args=(1,n_messages,2))
p3 = Process(target=connect_sub, args=(0.1,n_messages,1))
p4 = Process(target=connect_sub, args=(0.1,n_messages,2))
p1.start()
p2.start()
p3.start()
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()

pub_id=2 消息丢失的结果:

1 sending_func=bind_pub message_number=1 pub_id=1 receiving_func=connect_sub sub_id=2
1 sending_func=bind_pub message_number=1 pub_id=1 receiving_func=connect_sub sub_id=1
1 sending_func=bind_pub message_number=2 pub_id=1 receiving_func=connect_sub sub_id=2
1 sending_func=bind_pub message_number=2 pub_id=1 receiving_func=connect_sub sub_id=1
1 sending_func=bind_pub message_number=3 pub_id=1 receiving_func=connect_sub sub_id=1
1 sending_func=bind_pub message_number=3 pub_id=1 receiving_func=connect_sub sub_id=2

类似地运行 connect_pub、connect_pub、connect_sub、bind_sub 架构:

# connect_pub, connect_pub, connect_sub, bind_sub
n_messages = 4
p1 = Process(target=conect_pub, args=(1,n_messages,1))
p2 = Process(target=conect_pub, args=(1,n_messages,2))
p3 = Process(target=bind_sub, args=(0.1,n_messages,1))
p4 = Process(target=connect_sub, args=(0.1,n_messages,2))
p1.start()
p2.start()
p3.start()
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()

sub_id=2 没有收到任何消息:

1 sending_func=conect_pub message_number=1 pub_id=1 receiving_func=bind_sub sub_id=1
1 sending_func=conect_pub message_number=1 pub_id=2 receiving_func=bind_sub sub_id=1
1 sending_func=conect_pub message_number=2 pub_id=1 receiving_func=bind_sub sub_id=1

最佳答案

好吧,
公平地说,ZeroMQ 主要是一个无代理框架,

这意味着第二个问题是先验解决的 - 不,它不仅不是 绝对必要,而且原则上也是不可能的(如果不实现 < em>Broker-(半)持久化作为 Zen-of-Zero 标准 ZeroMQ 工具的基础层一个额外的附加组件)。


接下来,
ZeroMQ 工具远不是您所知道的“套接字”-s:

这是一个经常被重新表述的误解,所以让我以粗体重复一遍。

注意:
ZeroMQ Socket()-instance 不是一个 tcp-socket-as-you-know-it。关于主要概念差异的最佳阅读 ZeroMQ hierarchy in less than a five seconds 或此处的其他帖子和讨论。


然而,
更重要的是,
似乎没有未涵盖的明确需求:

ZeroMQ 可以服务于所有:

many-PUB-s : many-SUB-s           -or-  
one-PUB : many-SUB-s -or- even
many-PUB-s : one-SUB

那些“许多”的全部或部分仍然可以.connect()-ed 到一个或多个接入点,所以生成的拓扑结构可能会变得非常疯狂(有关详细信息,请查看上面提供的 link to a "five seconds" read )因此,自己的想象力似乎是这样做的唯一上限。

有关性能和延迟范围,请随时 seek and read more in other posts.

关于zeromq - ZMQ/0MQ如何实现多个发布者和订阅者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49768777/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com