gpt4 book ai didi

c++ - ZeroMQ 并发发布和订阅

转载 作者:行者123 更新时间:2023-11-28 05:40:17 27 4
gpt4 key购买 nike

我正在开发一个 C++ 程序,该程序需要能够从任意数量的其他客户端发送/接收 JSON-payload。

起初,我尝试实现 PubNub 服务,但我发现我无法同时获取和发布消息(即使在不同的线程上使用两个不同的上下文)。我需要能够做到这一点。我还发现 PubNub 的延迟太长,我不喜欢。

我遇到了 ZeroMQ具有满足我需要的 PUB/SUB 模型的库。但是所有examples我无意间解释了如何以一个进程是发布者或订阅者的方式来实现它,而不是同时两者

理想情况下,我想编写一个服务器,将来自任何人的所有消息转发给订阅消息中指定的特定 channel 的任何人。 任何人都应该能够接收消息并向网络上的任何其他人发布消息,前提是他们订阅了正确的 channel 。


UPDATE 1:

注意:我不需要接收保险,因为有效载荷 N+1 将优先于有效载荷 N。我想要一个发送后忘记的通信方式(类似 UDP)。

根据要求:PubNub 限制 32 kB 每个 JSON-payload 对我来说是完美的,我不需要更多。事实上,我的有效载荷平均约为 4 kB。所有客户端实例都将在同一个本地网络上运行,因此理想情况下,延迟应小于 5 毫秒。至于客户端数量,一次不会超过 4 个客户端订阅同一个 channel /主题。


UPDATE 2 :

我无法提前预测会有多少个 channel /主题,但大约有几十个(大部分时间)、数百个(高峰期)。不是几千。


问题:

问题 1: - 我可以使用 ZeroMQ 实现这样的行为吗?
Q2: - 是否有任何工作示例可以证明这一点(最好是 C++)?
Q3: - 如果没有,对 C++ 中的库有什么建议吗?


pubsub architecture

最佳答案

ZeroMQ : is capable of serving this task well within scales given above
nanomsg : is capable of serving this task too, a need to cross-check ports/bindings for clients

Design review:

  • client 实例不是持久的,可能会自行出现,可能会自行消失或因错误而消失
  • client 实例自行决定它要发送给 PUB 的内容-作为消息负载
  • client 实例自行决定它要发送给 SUB 的内容-scribe 作为实际传入的消息流 TOPIC -过滤器
  • client 实例交换(发送),它自己,一个普通的,非多部分的,JSON - 它准备/制作的格式化消息
  • client 实例收集(接收)它假设在相同的、非多部分的消息,JSON -格式化的形状,并且在接收完成后将尝试对其进行本地处理
  • 客户端实例的最大数量不超过数百个
  • 任何 JSON 的最大尺寸-格式化的payload小于 32 kB ,关于 4 kB平均
  • 跨公共(public) LAN 冲突域的 E2E 进程到进程交付可接受的最大延迟低于 5,000 [usec]
  • server 实例是一个中心角色和持久实体
  • server 实例提供了一个已知的传输类 URL -所有迟到者的目标.connect() -s

Proposal:

server may deploy multiple behaviours to meet the given goals, using both the PUB and SUB behaviours, and provides a code-driven, fast, SUB-side attached, non-blocking event-loop .poll() with aligned re-transmission of any of it's SUB-side .recv()-ed payloads to it's PUB-side, currently .connect()-ed, audience ( live client instances ):

set s_SUB_recv = aZmqCONTEXT.socket( zmq.SUB );
and s_PUB_send = aZmqCONTEXT.socket( zmq.PUB );

for performance reasons, that are not so tough here, one may also segregate workload-streams' processing by mapping each one on disjunct sub-sets of the multiple created I/O-threads:

map s_SUB_recv.setsockopt( ZMQ_AFFINITY, 0 );
and s_PUB_send.setsockopt( ZMQ_AFFINITY, 1 );

set s_PUB_send.bind( "tcp://localhost:8899" );
+
set s_SUB_recv.setsockopt( ZMQ_SUBSCRIBE, "" ); // forever *every*-TOPIC
set s_SUB_recv.setsockopt( ZMQ_MAXMSGSIZE, 32000 ); // protective ceiling
set s_SUB_recv.setsockopt( ZMQ_CONFLATE, True ); // retain just the last msg
set s_SUB_recv.setsockopt( ZMQ_LINGER, 0 ); // avoid blocking
set s_SUB_recv.setsockopt( ZMQ_TOS, anAppToS_NETWORK_PRIO_CODE );

and s_SUB_recv.bind( "tcp://localhost:8888" ); // [PUB]s .connect()


Similarly,
client instance may deploy a reverse-facing tandem of both a PUB-endpoint and SUB-endpoint, ready to .connect() to a known transport-target-URL.

The client specific subscription locally decides, what is to get filtered from the incoming stream of messages ( prior to ZeroMQ v.3.1 API the plentitude of all messages will get delivered to each client instance over the transport class, however since API v.3.1+, the topic-filter is being operated on the PUB-side, which in the desired modus-operandi eliminates the wasted volumes of data over the network, but at the same time, this increases the PUB-side processing overhead ( ref.: remarks on a principle of increased multi-I/O-threads mapping / performance boost above )

set c_SUB_recv = aZmqCONTEXT.socket( zmq.SUB );
and c_PUB_send = aZmqCONTEXT.socket( zmq.PUB );

unless the payload-assembly/processing overhead grows close to the permitted End-to-End latency threshold, there shall be no need to separate / segregate the ZeroMQ low-level I/O-threads here:
map c_SUB_recv.setsockopt( ZMQ_AFFINITY, 0 );
and c_PUB_send.setsockopt( ZMQ_AFFINITY, 1 );

set c_PUB_send.connect( "tcp://server:8888" ); // reverse .bind on [SUB]
+
set c_SUB_recv.setsockopt( ZMQ_SUBSCRIBE, "" ); // modified on-the-fly
set c_SUB_recv.setsockopt( ZMQ_MAXMSGSIZE, 32000 ); // protective ceiling
set c_SUB_recv.setsockopt( ZMQ_CONFLATE, True ); // take just last
set c_SUB_recv.setsockopt( ZMQ_LINGER, 0 ); // avoid blocking
set c_SUB_recv.setsockopt( ZMQ_TOS, anAppToS_NETWORK_PRIO_CODE );
and c_SUB_recv.connect( "tcp://server:8899" );


讨论:

对于业余项目,消息基础设施不需要太多,然而对于更严肃的领域,服务器客户端实例应该有额外的服务添加了一些进一步的正式交流模式行为。
- r/KBD 用于远程键盘,带有 CLI -类似的临时检查工具
- KEEP_ALIVE 允许全系统状态/性能监控的转发器
- SIG_EXIT 允许系统范围/实例特定的处理程序 SIG_EXIT
- distributed syslog 服务允许安全地收集/存储日志记录的非阻塞拷贝(无论是在调试阶段还是性能-tuninc 阶段或生产级证据记录收集)

- Identity Management 审计追踪等工具

- WhiteList/BlackList 为基础设施增加稳健性,使其更好地抵抗 DoS 攻击/中毒、错误的 NIC 流量突发等

- Adaptive Node re-Discovery 用于更智能/临时基础设施设计和状态监控,或当多角色/( N + M )-shaded active hot-standby 角色切换/接管场景等出现时

总结

A1 : 是的,完全在 ZeroMQ 能力范围内
A2 :是的,ZeroMQ 书籍/指南中的 C++ 代码示例可用
A3 : Ref.: A1, plus may like indepth remark in Martin SUSTRIK's post on "Differences between nanomsg and ZeroMQ "

希望您能享受分布式处理的力量,ZeroMQ 支持它或 nanomsg或两者兼而有之。

只有自己的想象力才是极限。

如有兴趣further details, one might love the book referred to in the The Best Next Step section of this post

关于c++ - ZeroMQ 并发发布和订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37265557/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com