gpt4 book ai didi

python - 为什么这个用于分布式计算的 Python 0MQ 脚本会在固定输入大小时挂起?

转载 作者:太空狗 更新时间:2023-10-30 00:00:15 25 4
gpt4 key购买 nike

最近开始学习0MQ .今天早些时候,我遇到了一个博客,Python Multiprocessing with ZeroMQ .它谈到了the ventilator pattern在我读到的 0MQ 指南中,所以我决定试一试。

我决定尝试让呼吸机通过 0mq 消息向工作人员发送大型数组,而不是像原始代码那样只计算工作人员的数字乘积。以下是我在“实验”中使用的代码。

如以下评论所述,每当我尝试将变量 string_length 增加到大于 3MB 的数字时,代码都会挂起。

典型症状:假设我们将 string_length 设置为 4MB(即 4194304),然后结果管理器可能从一个 worker 那里获取结果,然后代码就暂停了。 htop 显示 2 个核心没有做太多事情。 Etherape 网络流量监视器也显示 lo 接口(interface)上没有流量。

到目前为止,经过几个小时的环顾四周,我还没有弄清楚是什么原因造成的,如果能提供一两个关于此问题的原因和任何解决方案的提示,我将不胜感激。谢谢!

我在戴尔笔记本电脑上运行 Ubuntu 11.04 64 位,配备英特尔酷睿处理器、8GB 内存、80GB 英特尔 X25MG2 固态硬盘、Python 2.7.1+、libzmq1 2.1.10-1chl1~natty1、python-pyzmq 2.1.10-1chl1 ~natty1

import time
import zmq
from multiprocessing import Process, cpu_count

np = cpu_count()
pool_size = np
number_of_elements = 128
# Odd, why once the slen is bumped to 3MB or above, the code hangs?
string_length = 1024 * 1024 * 3

def create_inputs(nelem, slen, pb=True):
'''
Generates an array that contains nelem fix-sized (of slen bytes)
random strings and an accompanying array of hexdigests of the
former's elements. Both are returned in a tuple.

:type nelem: int
:param nelem: The desired number of elements in the to be generated
array.
:type slen: int
:param slen: The desired number of bytes of each array element.
:type pb: bool
:param pb: If True, displays a text progress bar during input array
generation.
'''
from os import urandom
import sys
import hashlib

if pb:
if nelem <= 64:
toolbar_width = nelem
chunk_size = 1
else:
toolbar_width = 64
chunk_size = nelem // toolbar_width
description = '%d random strings of %d bytes. ' % (nelem, slen)
s = ''.join(('Generating an array of ', description, '...\n'))
sys.stdout.write(s)
# create an ASCII progress bar
sys.stdout.write("[%s]" % (" " * toolbar_width))
sys.stdout.flush()
sys.stdout.write("\b" * (toolbar_width+1))
array = list()
hash4a = list()
try:
for i in range(nelem):
e = urandom(int(slen))
array.append(e)
h = hashlib.md5()
h.update(e)
he = h.hexdigest()
hash4a.append(he)
i += 1
if pb and i and i % chunk_size == 0:
sys.stdout.write("-")
sys.stdout.flush()
if pb:
sys.stdout.write("\n")
except MemoryError:
print('Memory Error: discarding existing arrays')
array = list()
hash4a = list()
finally:
return array, hash4a

# The "ventilator" function generates an array of nelem fix-sized (of slen
# bytes long) random strings, and sends the array down a zeromq "PUSH"
# connection to be processed by listening workers, in a round robin load
# balanced fashion.

def ventilator():
# Initialize a zeromq context
context = zmq.Context()

# Set up a channel to send work
ventilator_send = context.socket(zmq.PUSH)
ventilator_send.bind("tcp://127.0.0.1:5557")

# Give everything a second to spin up and connect
time.sleep(1)

# Create the input array
nelem = number_of_elements
slen = string_length
payloads = create_inputs(nelem, slen)

# Send an array to each worker
for num in range(np):
work_message = { 'num' : payloads }
ventilator_send.send_pyobj(work_message)

time.sleep(1)

# The "worker" functions listen on a zeromq PULL connection for "work"
# (array to be processed) from the ventilator, get the length of the array
# and send the results down another zeromq PUSH connection to the results
# manager.

def worker(wrk_num):
# Initialize a zeromq context
context = zmq.Context()

# Set up a channel to receive work from the ventilator
work_receiver = context.socket(zmq.PULL)
work_receiver.connect("tcp://127.0.0.1:5557")

# Set up a channel to send result of work to the results reporter
results_sender = context.socket(zmq.PUSH)
results_sender.connect("tcp://127.0.0.1:5558")

# Set up a channel to receive control messages over
control_receiver = context.socket(zmq.SUB)
control_receiver.connect("tcp://127.0.0.1:5559")
control_receiver.setsockopt(zmq.SUBSCRIBE, "")

# Set up a poller to multiplex the work receiver and control receiver channels
poller = zmq.Poller()
poller.register(work_receiver, zmq.POLLIN)
poller.register(control_receiver, zmq.POLLIN)

# Loop and accept messages from both channels, acting accordingly
while True:
socks = dict(poller.poll())

# If the message came from work_receiver channel, get the length
# of the array and send the answer to the results reporter
if socks.get(work_receiver) == zmq.POLLIN:
#work_message = work_receiver.recv_json()
work_message = work_receiver.recv_pyobj()
length = len(work_message['num'][0])
answer_message = { 'worker' : wrk_num, 'result' : length }
results_sender.send_json(answer_message)

# If the message came over the control channel, shut down the worker.
if socks.get(control_receiver) == zmq.POLLIN:
control_message = control_receiver.recv()
if control_message == "FINISHED":
print("Worker %i received FINSHED, quitting!" % wrk_num)
break

# The "results_manager" function receives each result from multiple workers,
# and prints those results. When all results have been received, it signals
# the worker processes to shut down.

def result_manager():
# Initialize a zeromq context
context = zmq.Context()

# Set up a channel to receive results
results_receiver = context.socket(zmq.PULL)
results_receiver.bind("tcp://127.0.0.1:5558")

# Set up a channel to send control commands
control_sender = context.socket(zmq.PUB)
control_sender.bind("tcp://127.0.0.1:5559")

for task_nbr in range(np):
result_message = results_receiver.recv_json()
print "Worker %i answered: %i" % (result_message['worker'], result_message['result'])

# Signal to all workers that we are finsihed
control_sender.send("FINISHED")
time.sleep(5)

if __name__ == "__main__":

# Create a pool of workers to distribute work to
for wrk_num in range(pool_size):
Process(target=worker, args=(wrk_num,)).start()

# Fire up our result manager...
result_manager = Process(target=result_manager, args=())
result_manager.start()

# Start the ventilator!
ventilator = Process(target=ventilator, args=())
ventilator.start()

最佳答案

问题是您的呼吸机 (PUSH) socket 在发送完成之前正在关闭。您在呼吸机功能结束时睡了 1s,这不足以发送 384MB 消息。这就是为什么你有阈值的原因,如果 sleep 时间较短,则阈值会更低。

就是说,LINGER 应该 可以防止这种事情发生,所以我会用 zeromq 提出这个问题:PUSH 似乎不尊重 LINGER。

针对您的特定示例(不添加不确定的长时间 sleep )的解决方法是使用与您的工作人员相同的 FINISH 信号来终止您的呼吸机。这样,您就可以保证您的呼吸机可以在需要的时间内存活。

修订后的呼吸机:

def ventilator():
# Initialize a zeromq context
context = zmq.Context()

# Set up a channel to send work
ventilator_send = context.socket(zmq.PUSH)
ventilator_send.bind("tcp://127.0.0.1:5557")

# Set up a channel to receive control messages
control_receiver = context.socket(zmq.SUB)
control_receiver.connect("tcp://127.0.0.1:5559")
control_receiver.setsockopt(zmq.SUBSCRIBE, "")

# Give everything a second to spin up and connect
time.sleep(1)

# Create the input array
nelem = number_of_elements
slen = string_length
payloads = create_inputs(nelem, slen)

# Send an array to each worker
for num in range(np):
work_message = { 'num' : payloads }
ventilator_send.send_pyobj(work_message)

# Poll for FINISH message, so we don't shutdown too early
poller = zmq.Poller()
poller.register(control_receiver, zmq.POLLIN)

while True:
socks = dict(poller.poll())

if socks.get(control_receiver) == zmq.POLLIN:
control_message = control_receiver.recv()
if control_message == "FINISHED":
print("Ventilator received FINSHED, quitting!")
break
# else: unhandled message

关于python - 为什么这个用于分布式计算的 Python 0MQ 脚本会在固定输入大小时挂起?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8905147/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com