- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
最近开始学习0MQ .今天早些时候,我遇到了一个博客,Python Multiprocessing with ZeroMQ .它谈到了the ventilator pattern在我读到的 0MQ 指南中,所以我决定试一试。
我决定尝试让呼吸机通过 0mq 消息向工作人员发送大型数组,而不是像原始代码那样只计算工作人员的数字乘积。以下是我在“实验”中使用的代码。
如以下评论所述,每当我尝试将变量 string_length 增加到大于 3MB 的数字时,代码都会挂起。
典型症状:假设我们将 string_length 设置为 4MB(即 4194304),然后结果管理器可能从一个 worker 那里获取结果,然后代码就暂停了。 htop 显示 2 个核心没有做太多事情。 Etherape 网络流量监视器也显示 lo 接口(interface)上没有流量。
到目前为止,经过几个小时的环顾四周,我还没有弄清楚是什么原因造成的,如果能提供一两个关于此问题的原因和任何解决方案的提示,我将不胜感激。谢谢!
我在戴尔笔记本电脑上运行 Ubuntu 11.04 64 位,配备英特尔酷睿处理器、8GB 内存、80GB 英特尔 X25MG2 固态硬盘、Python 2.7.1+、libzmq1 2.1.10-1chl1~natty1、python-pyzmq 2.1.10-1chl1 ~natty1
import time
import zmq
from multiprocessing import Process, cpu_count
np = cpu_count()
pool_size = np
number_of_elements = 128
# Odd, why once the slen is bumped to 3MB or above, the code hangs?
string_length = 1024 * 1024 * 3
def create_inputs(nelem, slen, pb=True):
'''
Generates an array that contains nelem fix-sized (of slen bytes)
random strings and an accompanying array of hexdigests of the
former's elements. Both are returned in a tuple.
:type nelem: int
:param nelem: The desired number of elements in the to be generated
array.
:type slen: int
:param slen: The desired number of bytes of each array element.
:type pb: bool
:param pb: If True, displays a text progress bar during input array
generation.
'''
from os import urandom
import sys
import hashlib
if pb:
if nelem <= 64:
toolbar_width = nelem
chunk_size = 1
else:
toolbar_width = 64
chunk_size = nelem // toolbar_width
description = '%d random strings of %d bytes. ' % (nelem, slen)
s = ''.join(('Generating an array of ', description, '...\n'))
sys.stdout.write(s)
# create an ASCII progress bar
sys.stdout.write("[%s]" % (" " * toolbar_width))
sys.stdout.flush()
sys.stdout.write("\b" * (toolbar_width+1))
array = list()
hash4a = list()
try:
for i in range(nelem):
e = urandom(int(slen))
array.append(e)
h = hashlib.md5()
h.update(e)
he = h.hexdigest()
hash4a.append(he)
i += 1
if pb and i and i % chunk_size == 0:
sys.stdout.write("-")
sys.stdout.flush()
if pb:
sys.stdout.write("\n")
except MemoryError:
print('Memory Error: discarding existing arrays')
array = list()
hash4a = list()
finally:
return array, hash4a
# The "ventilator" function generates an array of nelem fix-sized (of slen
# bytes long) random strings, and sends the array down a zeromq "PUSH"
# connection to be processed by listening workers, in a round robin load
# balanced fashion.
def ventilator():
# Initialize a zeromq context
context = zmq.Context()
# Set up a channel to send work
ventilator_send = context.socket(zmq.PUSH)
ventilator_send.bind("tcp://127.0.0.1:5557")
# Give everything a second to spin up and connect
time.sleep(1)
# Create the input array
nelem = number_of_elements
slen = string_length
payloads = create_inputs(nelem, slen)
# Send an array to each worker
for num in range(np):
work_message = { 'num' : payloads }
ventilator_send.send_pyobj(work_message)
time.sleep(1)
# The "worker" functions listen on a zeromq PULL connection for "work"
# (array to be processed) from the ventilator, get the length of the array
# and send the results down another zeromq PUSH connection to the results
# manager.
def worker(wrk_num):
# Initialize a zeromq context
context = zmq.Context()
# Set up a channel to receive work from the ventilator
work_receiver = context.socket(zmq.PULL)
work_receiver.connect("tcp://127.0.0.1:5557")
# Set up a channel to send result of work to the results reporter
results_sender = context.socket(zmq.PUSH)
results_sender.connect("tcp://127.0.0.1:5558")
# Set up a channel to receive control messages over
control_receiver = context.socket(zmq.SUB)
control_receiver.connect("tcp://127.0.0.1:5559")
control_receiver.setsockopt(zmq.SUBSCRIBE, "")
# Set up a poller to multiplex the work receiver and control receiver channels
poller = zmq.Poller()
poller.register(work_receiver, zmq.POLLIN)
poller.register(control_receiver, zmq.POLLIN)
# Loop and accept messages from both channels, acting accordingly
while True:
socks = dict(poller.poll())
# If the message came from work_receiver channel, get the length
# of the array and send the answer to the results reporter
if socks.get(work_receiver) == zmq.POLLIN:
#work_message = work_receiver.recv_json()
work_message = work_receiver.recv_pyobj()
length = len(work_message['num'][0])
answer_message = { 'worker' : wrk_num, 'result' : length }
results_sender.send_json(answer_message)
# If the message came over the control channel, shut down the worker.
if socks.get(control_receiver) == zmq.POLLIN:
control_message = control_receiver.recv()
if control_message == "FINISHED":
print("Worker %i received FINSHED, quitting!" % wrk_num)
break
# The "results_manager" function receives each result from multiple workers,
# and prints those results. When all results have been received, it signals
# the worker processes to shut down.
def result_manager():
# Initialize a zeromq context
context = zmq.Context()
# Set up a channel to receive results
results_receiver = context.socket(zmq.PULL)
results_receiver.bind("tcp://127.0.0.1:5558")
# Set up a channel to send control commands
control_sender = context.socket(zmq.PUB)
control_sender.bind("tcp://127.0.0.1:5559")
for task_nbr in range(np):
result_message = results_receiver.recv_json()
print "Worker %i answered: %i" % (result_message['worker'], result_message['result'])
# Signal to all workers that we are finsihed
control_sender.send("FINISHED")
time.sleep(5)
if __name__ == "__main__":
# Create a pool of workers to distribute work to
for wrk_num in range(pool_size):
Process(target=worker, args=(wrk_num,)).start()
# Fire up our result manager...
result_manager = Process(target=result_manager, args=())
result_manager.start()
# Start the ventilator!
ventilator = Process(target=ventilator, args=())
ventilator.start()
最佳答案
问题是您的呼吸机 (PUSH) socket 在发送完成之前正在关闭。您在呼吸机功能结束时睡了 1s
,这不足以发送 384MB 消息。这就是为什么你有阈值的原因,如果 sleep 时间较短,则阈值会更低。
就是说,LINGER 应该 可以防止这种事情发生,所以我会用 zeromq 提出这个问题:PUSH 似乎不尊重 LINGER。
针对您的特定示例(不添加不确定的长时间 sleep )的解决方法是使用与您的工作人员相同的 FINISH 信号来终止您的呼吸机。这样,您就可以保证您的呼吸机可以在需要的时间内存活。
修订后的呼吸机:
def ventilator():
# Initialize a zeromq context
context = zmq.Context()
# Set up a channel to send work
ventilator_send = context.socket(zmq.PUSH)
ventilator_send.bind("tcp://127.0.0.1:5557")
# Set up a channel to receive control messages
control_receiver = context.socket(zmq.SUB)
control_receiver.connect("tcp://127.0.0.1:5559")
control_receiver.setsockopt(zmq.SUBSCRIBE, "")
# Give everything a second to spin up and connect
time.sleep(1)
# Create the input array
nelem = number_of_elements
slen = string_length
payloads = create_inputs(nelem, slen)
# Send an array to each worker
for num in range(np):
work_message = { 'num' : payloads }
ventilator_send.send_pyobj(work_message)
# Poll for FINISH message, so we don't shutdown too early
poller = zmq.Poller()
poller.register(control_receiver, zmq.POLLIN)
while True:
socks = dict(poller.poll())
if socks.get(control_receiver) == zmq.POLLIN:
control_message = control_receiver.recv()
if control_message == "FINISHED":
print("Ventilator received FINSHED, quitting!")
break
# else: unhandled message
关于python - 为什么这个用于分布式计算的 Python 0MQ 脚本会在固定输入大小时挂起?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8905147/
我有一个测试即将进行,我想澄清两个有关参数的问题。 在我的笔记中指出,将参数传递给函数的推荐方法是使用“按引用传递” const type& x; // for "in" parameters
当我通过 OMG 2.5(Beta)推广的 UML 规范阅读以下概念时: in: Indicates that Parameter values are passed in by the caller
我试图在用户按下 Enter 时触发一个函数。我将此输入设置为只读的原因是限制用户在填充值后修改输入中的值。 该值来自将在点击属性中触发的弹出窗口。问题是 keyup.enter 没有触发该输入。 代
我在jQuery中使用模式弹出窗口控件,该弹出窗口具有由jQuery Tokenize输入插件提供动力的输入文本。问题是,当我在模式弹出文本框中键入内容时, token 化插件的搜索结果显示为隐藏在弹
我有一个问题。当我选中复选框时,系统工作正常,总值发生变化,但一旦我修改文本输入,它就会变为 0。我需要将此文本框输入排除在更改值之外。 这是 html: $15000 $
我正在努力让它发挥作用,但还是有些不对劲。 我想用 CSS 设置提交按钮的样式以匹配我已有的按钮。 风格: input[type="button"], input[type="submit"], b
import java.util.*;; public class selection { Scanner in=new Scanner(System.in); private
这可能是一个非常菜鸟的问题。假设我有一个带宽限制为 100MB/s 的网卡,那么输入/输出带宽是否有可能达到该限制 同时 ?或者我会在任何时候遇到这个不等式:in bandwidth + out ba
看着这个问题,Fill immutable map with for loop upon creation ,我很好奇是什么this表示在 Map(1 -> this) . scala> Map(1
我有这样的东西 一个 乙 问? 是或否 数字 数字或零 我想做的是: 如果 B1 = “Y”,则让用户在 B2 中输入一个数字。 如果 B1 = “N”,则将 B2 中的值更改为零,并且不允许用户在
我有一个包含许多列的表,我想添加 input标题单元格内的字段,但我希望输入适合根据正文内容的宽度。 这是没有 input 的样子领域: 这就是 input 的样子领域: 可以看出,像“index”和
关于为 FTP 客户端设置传出和传入文件夹,您遵循哪些最佳实践(如果有)?我们通常使用“outgoing”和“incoming”,但无论你如何表述方向,它都可以有两种解释方式,具体取决于名称相对于哪一
我正在尝试“求解”给定 d 的 Pell 方程:x^2 - d * y^2 = 1,或者至少我想得到最小的 x > 0 来求解方程。到目前为止,一切都很好。这是我的 Haskell 代码 minX :
我是VS Code的新手,可以使用Ctrl + Enter将代码运行到python交互式窗口中。我希望光标自动移动到下一行,因此我可以逐行浏览代码。 能做到吗? 最佳答案 如this blog pos
我正在创建一个 bool 值矩阵/二维数组,并且我想为 dategrid 推断一种不仅仅是“ANY”的类型。 let yearRange = [2000,2001,2002,2003,2004]; l
我有两个排序的列表,例如 a = [1, 4, 7, 8] b = [1, 2, 3, 4, 5, 6] 我想知道a中的每个项目是否在b中。对于上面的示例,我想找到 a_in_b = [True, T
菜鸟警报 这很奇怪 - 当我编写以下代码时,尝试在 AngularJS 中创建自定义指令: myModule.directive('myTab', function(){ console.lo
已关闭。此问题需要 debugging details 。目前不接受答案。 编辑问题以包含 desired behavior, a specific problem or error, and the
假设我正在使用 gdscript 静态类型,并且对于一个函数参数,我事先不知道我会得到什么。这就是 python 中 typing.Any 的作用。如何使用 gdscript 做到这一点? 似乎 Va
我使用 dropzone 上传多个图像,并且工作正常,直到我想为每个图像插入品牌和网址。 我遇到的唯一问题是,当我要从输入字段获取值时,我会从服务器获取来自字段(品牌、网址)的未定义值,但如果我使用静
我是一名优秀的程序员,十分优秀!