gpt4 book ai didi

python - ZeroMQ:PUSH 上的 HWM 不起作用

转载 作者:太空宇宙 更新时间:2023-11-03 13:45:41 25 4
gpt4 key购买 nike

我正在尝试编写一个服务器/客户端脚本,其中包含一个用于执行任务的服务器和多个执行它的工作人员。问题是我的呼吸机有太多任务,它会在心跳中填满内存。我试图在绑定(bind)之前设置 HWM,但没有成功。一旦工作人员连接,它就会继续发送消息,完全无视设置的 HWM。我还有一个水槽,用于记录已完成的任务。

服务器.py

import zmq

def ventilate():
context = zmq.Context()

# Socket to send messages on
sender = context.socket(zmq.PUSH)
sender.setsockopt(zmq.SNDHWM, 30) #Big messages, so I don't want to keep too many in queue
sender.bind("tcp://*:5557")


# Socket with direct access to the sink: used to syncronize start of batch
sink = context.socket(zmq.PUSH)
sink.connect("tcp://localhost:5558")

print "Sending tasks to workers…"

# The first message is "0" and signals start of batch
sink.send('0')
print "Sent starting signal"

while True:
sender.send("Message")



if __name__=="__main__":
ventilate()

worker .py

import zmq
from multiprocessing import Process

def work():
context = zmq.Context()

# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")

# Socket to send messages to
sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")

# Process t asks forever
while True:
msg = receiver.recv_msg()
print "Doing sth with msg %s"%(msg)
sender.send("Message %s done"%(msg))

if __name__ == "__main__":
for worker in range(10):
Process(target=work).start()

sink.py

import zmq

def sink():
context = zmq.Context()

# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")

# Wait for start of batch
s = receiver.recv()
print "Received start signal"
while True:
msg = receiver.recv_msg()
print msg


if __name__=="__main__":
sink()

最佳答案

好的,我试了一下,我认为问题不在于 PUSH HWM,而是您不能为 PULL 设置 HWM。如果你看this documentation ,您可以看到它在 HWM 上显示 N/A。

PULL 套接字似乎每个都接收数百条消息(我确实尝试设置一个 HWM 以防它在 PULL 套接字上做任何事情。它没有。)。我通过更改呼吸机以发送带有递增整数的消息,并将池中的每个工作人员更改为在调用 recv() 之间等待 2 秒来证明这一点。工作人员打印出他们正在处理具有截然不同的整数的消息。例如,一个工作人员将处理消息 10,而下一个工作人员正在处理消息 400。随着时间的推移,您会看到正在处理消息 10 的工作人员现在正在处理消息 11、12、13 等,而其他正在处理401、402等。

这向我表明 ZMQ_PULL 套接字正在某处缓冲消息。因此,虽然 ZMQ_PUSH 套接字确实具有 HWM,但 PULL 套接字正在快速请求消息,尽管它们实际上并未通过调用 recv() 来访问。因此,如果连接了 PULL 套接字,则 PUSH HWM 会被有效地忽略。据我所知,您无法控制 PULL 套接字缓冲区的长度(我希望 RCVHWM 套接字选项可以控制它,但它似乎没有)。

这种行为当然引出了 ZMQ_PULL HWM 选项的意义所在的问题,只有当您还可以控制接收套接字 HWM 时才有意义。

此时,我会开始询问 0MQ people您是否遗漏了一些明显的东西,或者这是否被视为错误。

抱歉,我不能提供更多帮助!

关于python - ZeroMQ:PUSH 上的 HWM 不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21133535/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com