gpt4 book ai didi

python - 为什么 ZMQ 不会丢弃消息?

转载 作者:行者123 更新时间:2023-11-28 19:21:36 28 4
gpt4 key购买 nike

我有一个应用程序,它使用 PUB/SUB 设置从 ZeroMQ 发布者获取消息。阅读器有时很慢,所以我在发送者和接收者上都设置了 HWM。我希望接收方会填充缓冲区并在从处理速度减慢中恢复时跳转以 catch 进度。但是我观察到的行为是它永远不会掉线! ZeroMQ 似乎忽略了 HWM。我做错了什么吗?

这是一个最小的例子:

发布者.py

import zmq
import time

ctx = zmq.Context()
sock = ctx.socket(zmq.PUB)

sock.setsockopt(zmq.SNDHWM, 1)

sock.bind("tcp://*:5556")

i = 0

while True:
sock.send(str(i))
print i
time.sleep(0.1)
i += 1

订阅者.py

import zmq
import time

ctx = zmq.Context()
sock = ctx.socket(zmq.SUB)
sock.setsockopt(zmq.SUBSCRIBE, "")
sock.setsockopt(zmq.RCVHWM, 1)
sock.connect("tcp://localhost:5556")

while True:
print sock.recv()
time.sleep(0.5)

最佳答案

我相信这里有几件事在起作用:

  1. High Water Marks are not exact (请参阅链接部分的最后一段)- 通常这意味着实际队列大小将小于您列出的数字,我不知道这在 1 时会如何表现>.
  2. 您的 PUB HWM 永远不会丢弃消息...由于 PUB 套接字的工作方式,它总是会立即处理消息是否是否有可用的订阅者。因此,除非 ZMQ 实际需要 0.1 秒来处理队列中的消息,否则您的 HWM 将永远不会在 PUB 端发挥作用。

应该发生的事情如下所示(我假设一个操作顺序可以让您实际收到第一条发布的消息):

  1. 启动 subscriber.py 并等待一段合适的时间以确保它完全启动(基本上立即启动)
  2. 启动publisher.py
  3. PUB处理并发送第一条消息,SUB接收并处理第一条消息
  4. PUB 休眠 0.1 秒并处理并发送第二条消息
  5. SUB 休眠 0.5 秒,套接字收到第二条消息,但在队列中等待下一次调用 sock.recv() 处理它
  6. PUB 休眠 0.1 秒并处理并发送第三条消息
  7. SUB 仍然休眠了 0.3 秒,因此第三条消息应该在第二条消息之后进入队列,这将使队列中有 2 条消息,而第三条消息应该丢弃,因为HWM

...等等等等

我建议进行以下更改以帮助解决问题:

  1. 删除发布商上的 HWM...它只会添加一个我们不需要在您的测试用例中处理的变量,因为我们从不期望它会改变任何东西。如果您的生产环境需要它,请将其重新添加并稍后在高容量场景中进行测试。
  2. 将订阅者上的 HWM 更改为 50。这会使测试花费更长的时间,但您不会处于极端情况,因为 ZMQ 文档指出 HWM 是确切地说,极端情况可能会导致意外行为。请注意,我相信你的测试(小数字)不会那样做,但我没有看过实现队列的代码所以我不能肯定地说,你的数据可能足够小您的有效 HWM 实际上更大
  3. 将您的订阅者 sleep 时间更改为 3 整秒...理论上,如果您的队列恰好容纳 50 条消息,您将在两个循环内使它饱和(就像您现在所做的那样),然后您将拥有等待 2.5 分钟来处理这些消息,看看您是否开始跳过,在前 50 条消息之后应该开始跳过大量数字。但我至少要等 5-10 分钟。如果您发现您在 100 或 200 条消息后开始跳过,那么您就会被数据量小所困扰。

这当然没有解决如果您仍然不跳过任何消息会发生什么......如果您这样做但仍然遇到同样的问题,那么我们可能需要深入研究高水位标记的实际工作原理,我们可能遗漏了什么。

关于python - 为什么 ZMQ 不会丢弃消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23800442/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com