gpt4 book ai didi

python - Pyzmq 高水位线在 pub 套接字上不起作用

转载 作者:行者123 更新时间:2023-12-01 08:44:30 25 4
gpt4 key购买 nike

根据 ZeroMQ 文档,一旦排队的消息数量达到高水位线,pub 套接字就会删除消息。

这在以下示例中似乎不起作用(是的,我确实在绑定(bind)/连接之前设置了 hwm):

import time
import pickle
from threading import Thread
import zmq

ctx = zmq.Context()

def pub_thread():
pub = ctx.socket(zmq.PUB)
pub.set_hwm(2)
pub.bind('tcp://*:5555')

i = 0
while True:
# Send message every 100ms
time.sleep(0.1)
pub.send_string("test", zmq.SNDMORE)
pub.send_pyobj(i)
i += 1

def sub_thread():
sub = ctx.socket(zmq.SUB)
sub.subscribe("test")
sub.connect('tcp://localhost:5555')
while True:
# Receive messages only every second
time.sleep(1)
msg = sub.recv_multipart()
print("Sub: %d" % pickle.loads(msg[1]))

t_pub = Thread(target=pub_thread)
t_sub = Thread(target=sub_thread)
t_pub.start()
t_sub.start()

while True:
pass

我在 pub 上发送消息的速度比在子套接字上读取消息的速度快 10 倍,hwm 设置为 2。我预计大约每 10 条消息就会收到一次。相反,我看到以下输出:

Sub: 0
Sub: 1
Sub: 2
Sub: 3
Sub: 4
Sub: 5
Sub: 6
Sub: 7
Sub: 8
Sub: 9
Sub: 10
Sub: 11
Sub: 12
Sub: 13
Sub: 14
...

所以我看到所有消息都到达,因此它们被保存在某个队列中直到我读取它们。在连接之前在子套接字上添加 hwm=2 时也是如此。

我做错了什么或者我误解了hwm

我使用pyzmq版本17.1.2

最佳答案

借用issue which I opened in Github的答案,我已将我的答案更新如下:

<小时/>

Messages are held in operating system's network buffers. I have found HWMs to be not that useful because of that. Here is modified code where subscriber misses messages:

import time
import pickle
import zmq
from threading import Thread
import os

ctx = zmq.Context()

def pub_thread():
pub = ctx.socket(zmq.PUB)
pub.setsockopt(zmq.SNDHWM, 2)
pub.setsockopt(zmq.SNDBUF, 2*1024) # See: http://api.zeromq.org/4-2:zmq-setsockopt
pub.bind('tcp://*:5555')
i = 0
while True:
time.sleep(0.001)
pub.send_string(str(i), zmq.SNDMORE)
pub.send(os.urandom(1024))
i += 1

def sub_thread():
sub = ctx.socket(zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, b'')
sub.setsockopt(zmq.RCVHWM, 2)
sub.setsockopt(zmq.RCVBUF, 2*1024)
sub.connect('tcp://localhost:5555')
while True:
time.sleep(0.1)
msg, _ = sub.recv_multipart()
print("Received:", msg.decode())

t_pub = Thread(target=pub_thread)
t_pub.start()
sub_thread()

Output looks something like this:

Received: 0
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Received: 6
Received: 47
Received: 48
Received: 64
Received: 65
Received: 84
Received: 85
Received: 159
Received: 160
Received: 270

Messages are missed because all queues/buffers are full and publisher starts to drop messages (see documentation for ZMQ_PUB: http://api.zeromq.org/4-2:zmq-socket).

<小时/>

[注意]:

  • 您应该在监听者/订阅者和广告商/发布者中使用高水位线选项。
  • 这些帖子也相关 ( Post1 - Post2 )
  • sock.setsockopt(zmq.CONFLATE, 1) 是另一种仅获取订阅者端定义的最后一条消息的选项。

关于python - Pyzmq 高水位线在 pub 套接字上不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53356451/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com