gpt4 book ai didi

python - ZeroMQ:订阅者订阅的主题数量有限制吗?

转载 作者:行者123 更新时间:2023-11-28 18:22:37 26 4
gpt4 key购买 nike

我正在使用 ZeroMQ 的 PUB/SUB 套接字模式。 PUB 制作并发布股票的财务数据。主题设置为每只股票的代码。在SUB端,客户可以根据股票代码订阅自己想要的数据。 PUB 是用 C 编写的,SUB 是用 Python 编写的。

但是,在测试过程中出现了问题。如果在 SUB 套接字上只设置一个股票代码作为消息过滤器,则一切正常。但是当涉及到大量股票时,程序会在短时间内崩溃,报错“Segmentation fault (core dumped)”(详见下文)。

这是 PUB (C) 的代码:

while (1) {  
int rc = 0;
// send topic
rc = zmq_send(pub_socket, topic, rc, ZMQ_SNDMORE);
if (rc == -1) {
// error handling
}
// send stock data
rc = zmq_send(pub_socket, data, rc, 0);
if (rc == -1) {
// error handling
}
}

这是 SUB (Python) 的代码:

import zmq

# initialize a SUB socket
ctx = zmq.Context.instance()
socket = ctx.socket(zmq.SUB)

# set socket options to filter message
for code in code_list:
socket.setsockopt_string(zmq.SUBSCRIBE, code)

socket.connect(PUB_ADDR)
# recv data from PUB
while True:
data = socket.recv()
print(data)

我还用gdb调试了程序。 调试结果如下: debug result enter image description here

有谁知道程序崩溃的原因?欢迎任何帮助,谢谢。


更新:

如果我用以下代码替换 setsockopt_string 部分,Python 脚本运行良好。奇怪的是...我需要更深入地研究 setsockopt_string 函数。

Python 中的新代码:

socket.setsockopt_string( zmq.SUBSCRIBE, "" )

最新更新:

我运行了@user3666197 提供的脚本并得到了调试日志。由于日志很长,我只选择了其中的几个部分。

套接字初始化 Initializationsetsockopt_string 完成 setsockopt_string finished接收一条消息并退出 recv one msg and exited

最佳答案

介绍:

PUB-端使用 ZeroMQ v 4.1.5;SUB-side 使用 ZeroMQ Python wrapper 16.0.2

这隐含地使 PUB/SUB 模式依赖于 PUB,这与前几代 API 回到 v 2.0 形成对比端过滤,而您的 SIGSEGV 指示报告了 SUB 端的问题。

尽管假设过滤是根本原因,但我记得一些关于大树过滤问题的技术辩论,仍然有一个小惊喜,就像在一些关于 Trie 搜索的帖子中,添加的 "" leaf-node 也做了一个神奇的服务。如果有帮助,将尝试再次找到这场辩论。

初始remarks from Martin Sustrik refer up to ZeroMQ 过滤器中大约 10,000 个订阅不会产生问题(在进一步的设计讨论中有一些更高的数字):

Efficient Subscription Matching

In ZeroMQ, simple tries are used to store and match PUB/SUB subscriptions. The subscription mechanism was intended for up to 10,000 subscriptions where simple trie works well. However, there are users who use as much as 150,000,000 subscriptions. In such cases there's a need for a more efficient data structure. Thus, nanomsg uses memory-efficient version of Patricia trie instead of simple trie.

For more details check this article.


始终至少使用循序渐进的方法来诊断原因:

一个轻微的测试修改将使您更接近于打开问题的真实信封:

import zmq
pass; print "DEBUG: Ok, zmq imported. [ver:{0:}]".format( zmq.pyzmq_version() )
#_______________________________________________# SETUP ZMQ:
ctx = zmq.Context( 2 ) # Context( nIOthreads )
pass; print "DEBUG: Ok, zmq.Context() instantiated."
socket = ctx.socket( zmq.SUB ) # Socket( .SUB )
pass; print "DEBUG: Ok, Socket instantiated."
socket.connect( PUB_ADDR ) # .connect()
pass; print "DEBUG: Ok, .connect() completed."
socket.setsockopt( zmq.LINGER, 0 ) # explicit LINGER
pass; print "DEBUG: Ok, .setsockopt( LINGER, 0 ) completed."
#_______________________________________________# SET FILTER:
for code in code_list:
pass; print "DEBUG: Going to set SUB side n-th filter: {0: > 1000d}. == [{1:}]".format( code_list.index( code ), repr( code ) ),
socket.setsockopt_string( zmq.SUBSCRIBE, code )
pass; print "DEBUG: Ok, this one was done."
pass; print "DEBUG: Ok, all items from <code_list> exhausted."
#_______________________________________________# LOOP FOREVER:
while True:
try:
print "LOOP: .recv() call."
data = socket.recv()
print "LOOP: .recv()-ed {0:}[B] repr()-ed as [{1:}]".format( len( data ), repr( data ) )

except KeyboardInterrupt():
print "EXC: Ctrl-C will terminate."

except:
print "EXC: will terminate."

finally:
pass; print "DEBUG: Ok, finally: section entered:"
socket.close()
pass; print "DEBUG: Ok, Socket instance .close() call returned"
ctx.term()
pass; print "DEBUG: Ok, .Context() instance term()-ed"
break

鉴于描述的测​​试用例只有一个 PUB 和一个 SUB,另一个性能扩展&详细的缓冲区管理问题目前不会引发问题。在运行修改后的测试并发布简单的 DEBUG:log 后,将看到结果。

每秒发送大约 3k 条消息也不成问题。


更新:遗漏点 -- (1) Unicode 处理 + (2) 主题过滤器

(1) 如 DEBUG:log 中所示,您混合了 Unicode 和纯 C 字节数组。 These representations MUST match - system-wide (从 .send_string(),通过 .setsockopt_string(),直到 .recv_string())

data = socket.recv_string()           # AS YOUR DEBUG:log shows the b'mkt_bar...'

(2) 主题-过滤器必须匹配 - 否则消息会被分类为未订阅的消息...所以 u'abc .... ' 过滤器匹配 u'abc ....' 消息。否则:

setsockopt_string( option, optval, encoding='utf-8' )

An empty optval of length zero shall subscribe to all incoming messages. A non-empty optval shall subscribe to all messages beginning with the specified prefix. Multiple filters may be attached to a single ZMQ_SUB socket, in which case a message shall be accepted if it matches at least one filter.

上面提供的 DEBUG:log 片段显示(嗯,PrintScreens ... -- 请,下次复制/粘贴终端 ASCII,而不是图片,除非显示一些 GUI 功能,对吧?谢谢... ),您的主题过滤器在定义的意义上永远不会匹配。解决这个问题。全系统。

ZeroMQ 不应归咎于此,如果混合使用或调用接口(interface)错误,Unicode + C 字节数组根本无法工作并且会产生困惑。


结语:

如果仍然指责 ZeroMQ 主题过滤能力,最简单的 a/b 测试(dis)-approve Null-hypothesis 将运行完全相同的测试,但只有 5-topic-filter 元素就位。如果这两者都崩溃了,那么您关于容量相关限制的假设是错误的。

继续走!

关于python - ZeroMQ:订阅者订阅的主题数量有限制吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43981272/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com