gpt4 book ai didi

python - 如何将任务添加到同一个Pool执行的任务中?

转载 作者:太空宇宙 更新时间:2023-11-03 18:49:39 25 4
gpt4 key购买 nike

假设我有一个由 multiprocessing.Pool 执行的任务。如何允许此任务将新任务添加到执行它的中?例如,

def integers(pool, queue, n1, n2):
print ("integers(%d)" % n)
queue.put(n)
pool.apply_async(integers, (pool, queue, n+1)) # crashes; can't pickle `pool`

def start():
pool = multiprocessing.Pool()
queue = multiprocessing.Queue()
integers(pool, queue, 1)
while True:
yield queue.get()

最佳答案

无法对进行pickle,因此如果您希望工作人员能够添加任务,则必须找到解决方法。

您可以使用特定的“哨兵”返回值来告诉主程序将新任务添加到中:

while True:
ret_value = queue.get()
if is_sentinel(ret_value):
pool.apply_asynch(*get_arguments(ret_value))
yield ret_value

当返回值要求您向get_arguments添加更多作业时,is_sentinel返回True是一个能够获取要传递给的参数的函数。

此类功能的最简单实现可能是:

def is_sentinel(value):
"""Assume only sentinel values are tuples, or sequences."""
return isinstance(value, tuple)
# or, using duck-typing
try:
len(value)
except TypeError:
return False
else:
return True


def get_arguments(value):
"""Assume that the sentinel value *is* the tuple of arguments
to pass to the Pool.
"""
return value
# or return value[1:] if you want to include a "normal" return value

传递给 apply_asynch 的函数仅在想要添加新任务时才返回一个元组(或序列),并且在此如果它不提供任何返回值。添加提供返回值的可能性非常简单(例如:元组的第一个元素可以是“正常”返回值)。

另一种方法可能是使用第二个队列,工作人员可以在其中放置他们的请求。在每次迭代中,您可以使用 get_nowait() 方法来查看工作人员是否请求在队列中添加更多作业。

<小时/>

使用第一种方法的示例:

def is_sentinel(value):
return isinstance(value, tuple)

def get_arguments(value):
return value


def integers(queue, n1, n2):
print("integers(%d)" % n1)
queue.put(n1)
if n1 < n2:
queue.put((integers, (queue, n1+1, n2)))

def start():
pool = multiprocessing.Pool()
queue = multiprocessing.Queue()
m = 0
n = 100
pool.apply_asynch(integers, (queue, m, n))
while True:
ret_val = queue.get()
if is_sentinel(ret_val):
pool.apply_asynch(*get_arguments(ret_val))
else:
yield ret_val
<小时/>

使用第二种方法的示例:

# needed for queue.Empty exception
import queue

def integers(out_queue, task_queue, n1, n2):
print("integers(%d)" % n1)
out_queue.put(n1)
if n1 < n2:
task_queue.put((n1+1, n2))


def start():
pool = multiprocessing.Pool()
out_queue = multiprocessing.Queue()
task_queue = multiprocessing.Queue()
task_queue.put((0, 100))
while True:
try:
# may be safer to use a timeout...
m, n = task_queue.get_nowait()
pool.apply_asynch(integers, (out_queue, task_queue, m, n))
except queue.Empty:
# no task to perform
pass
yield out_queue.get()

关于python - 如何将任务添加到同一个Pool执行的任务中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18681322/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com