gpt4 book ai didi

python - 如何使用 apply_async 创建无限循环?

转载 作者:行者123 更新时间:2023-12-01 04:43:49 26 4
gpt4 key购买 nike

我有一个带有 apply_async 的进程池,其中不同的进程需要不同的时间来提供输出。一旦一个进程完成,我就会对其输出进行一些计算。在我想启动另一个进程之后。通过这种方式,我想创建一个无限循环,它启动进程,读取最近完成的进程的输出,进行一些计算并重新启动另一个进程。

到目前为止,我已经能够做我想做的事情,除了主进程卡在 get() 函数中。这是因为我不知道哪个进程终止了,因此我应该执行 get() 操作的结果条目。

一些尝试代码:

import multiprocessing as mp
import numpy as np
from time import sleep


def squared(x,y):
result = np.array((x,x))
if x%2 == 0:
sleep(2)
return result




if __name__ == "__main__":

pool = mp.Pool()

pool_r = []
for i in xrange(0,8):
pool_r.append(pool.apply_async(squared, (i,i)))

count_results = 0

for j in xrange(0,10):
result = pool_r[count_results].get()
print result
count_results += 1
pool_r.append(pool.apply_async(squared, (j,j)))

pool.close()
pool.join()

输出是: [0 0] [1 1] [2 2] [3 3] [4 4] [5 5] [6 6] [7 7] [0 0] [1 1]

而不是先奇数,然后偶数(因为这些数字有 sleep )。

有什么建议吗?

<小时/>

非常感谢您的快速回复 abarnert。

实际上,我想在进程完成后保持无限循环(我需要它们的结果能够进入循环)。

Q1 - 如果我创建一个包含 30 个作品的池,我可以提交超过 30 个进程吗?计算机会等待其中一个完成后才让另一个开始工作吗?

Q2 - 在您的代码中有一个回调函数。但是,当一个工作人员完成时我需要运行的代码必须位于主进程中,因为我必须更新将发送到我创建的新进程的变量。

Q3 - 主进程执行的代码占用了进程实现其任务所需时间的 10%。那么让主进程实现一些计算然后启动新进程是不是一个好方法呢?

Q4 - 现在如果我 Ctrl+C 代码只会在所有进程结束时终止。我该怎么做才能在执行 Ctrl+C 后立即终止代码?最后,在我发表评论之后,您认为 future 仍然是正确的选择吗?

一些我需要的伪代码:

launch several processes
wait for the results

launch several processes

while True:

get results from a recently finished process

do some calculations

launch two more processes

# some ending condition

最佳答案

问题是您正在按照作业发出的顺序等待结果,而不是按照作业完成的顺序。因此,如果作业 1 在作业 0 之前完成,也没关系;您仍在等待作业 0。

从根本上来说,问题在于 apply_async 返回 AsyncResult 对象,这些对象不是可组合的 future,但您希望像使用它们一样使用它们。你不能那样做。无法并行等待一堆 AsyncResult 直到其中一个完成。如果您想要的话,请使用 concurrent.futures相反,或者,对于 Python 2.7,PyPI 上的反向移植 futures ;然后您可以对任何 future 序列调用 wait,或者迭代 as_completed

可以通过使用回调而不是等待AsyncResult之上进行模拟,但这会让您的生活变得比实际需要的更加困难,因为你必须彻底改变控制流程。像这样的东西:

pool = mp.Pool() 

count_results = 0
def handle_result(result):
global count_results, done
print result
if count_results < 10:
pool.apply_async(squared, (count_results, count_results),
callback=handle_result)
elif count_results == 18:
pool.close()
count_results += 1

for i in xrange(0,8):
pool.apply_async(squared, (i,i), callback=handle_result)

pool.join()

关于python - 如何使用 apply_async 创建无限循环?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29941211/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com