gpt4 book ai didi

python - ZMQ 删除旧消息

转载 作者:太空宇宙 更新时间:2023-11-03 19:55:51 27 4
gpt4 key购买 nike

我正在尝试创建一个现实生活中的系统,其中订阅者需要根据发布者提供的实时数据执行操作。有时,PUB 和 SUB 会不同步(最多 10 秒),因为它们正在执行某些操作,而我总是需要发布者提供的最新数据,否则订阅者执行的操作将会偏离。

我正在尝试使用 SUB/PUB 方法,并且我正在尝试设置 HWM 限制,但它似乎不起作用。我尝试过断开连接方法,但它给系统增加了额外的一秒延迟,并且我的系统 90% 的时间都是实时工作的,因此通过使用断开连接,整个系统会崩溃。

订阅者(我正在尝试通过 time.sleep() 对实际系统进行建模):

import time
import zmq
import random

context = zmq.Context()
consumer_receiver = context.socket(zmq.SUB)

consumer_receiver.set_hwm(0)
consumer_receiver.connect("tcp://127.0.0.1:5555")

consumer_receiver.subscribe(b'')


while 1:
d=random.randint(0,10)

work = consumer_receiver.recv_pyobj()
# consumer_receiver.disconnect()
print(work," :",d)
time.sleep(d)

发布者:

import time
import zmq

context = zmq.Context()
zmq_socket = context.socket(zmq.PUB)
zmq_socket.bind("tcp://127.0.0.1:5555")

for x in range(1000):

# zmq_socket.send_string("", zmq.SNDMORE)
zmq_socket.send_pyobj(x,zmq.NOBLOCK)
time.sleep(1)
print(x)

最佳答案

好吧我的救星是CONFLATE 。感谢this发帖问题似乎解决了

import time
import zmq
import random

context = zmq.Context()
consumer_receiver = context.socket(zmq.SUB)

consumer_receiver.setsockopt(zmq.CONFLATE, 1)

consumer_receiver.connect("tcp://127.0.0.1:5555")
consumer_receiver.subscribe(b'')


while 1:
d=random.randint(1,10)

work = consumer_receiver.recv_pyobj()

print(work," :",d)

time.sleep(d)

关于python - ZMQ 删除旧消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59542620/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com