gpt4 book ai didi

python - 在所有任务完成后运行任务

转载 作者:太空狗 更新时间:2023-10-29 17:47:46 24 4
gpt4 key购买 nike

我正在编写一个应用程序,它需要并行运行一系列任务,然后是一个包含所有任务运行结果的任务:

@celery.task
def power(value, expo):
return value ** expo

@celery.task
def amass(values):
print str(values)

这是一个非常做作和过于简单的例子,但希望大家能很好地理解这一点。基本上,我有许多 项需要通过power 运行,但我只想对所有任务的结果运行amass。所有这些都应该异步发生,我不需要从 amass 方法返回任何东西。

有谁知道如何在 celery 中进行设置,以便所有内容都异步执行,并在完成所有操作后调用包含结果列表的单个回调?

我已按照 Alexander Afanasiev 的建议将此示例设置为使用 chord 运行:

from time import sleep

import random

tasks = []

for i in xrange(10):
tasks.append(power.s((i, 2)))
sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms

callback = amass.s()

r = chord(tasks)(callback)

不幸的是,在上面的示例中,tasks 中的所有任务仅在调用chord 方法时才启动。有没有一种方法可以让每个任务单独启动,然后我可以向组添加一个回调,以便在一切完成时运行?

最佳答案

这是一个适合我的目的的解决方案:

tasks.py:

from time import sleep

import random

@celery.task
def power(value, expo):
sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms
return value ** expo

@celery.task
def amass(results, tasks):
completed_tasks = []
for task in tasks:
if task.ready():
completed_tasks.append(task)
results.append(task.get())

# remove completed tasks
tasks = list(set(tasks) - set(completed_tasks))

if len(tasks) > 0:
# resend the task to execute at least 1 second from now
amass.delay(results, tasks, countdown=1)
else:
# we done
print results

用例:

tasks = []

for i in xrange(10):
tasks.append(power.delay(i, 2))

amass.delay([], tasks)

应该做的是尽快异步启动所有任务。一旦它们都被发布到队列中,amass 任务也将被发布到队列中。 amass 任务将不断重新发布,直到所有其他任务都已完成。

关于python - 在所有任务完成后运行任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16308849/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com