gpt4 book ai didi

celery - 安排 Celery 任务在其他任务完成后运行

转载 作者:行者123 更新时间:2023-12-04 14:27:52 26 4
gpt4 key购买 nike

我想完成这样的事情:

results = []
for i in range(N):
data = generate_data_slowly()
res = tasks.process_data.apply_async(data)
results.append(res)
celery.collect(results).then(tasks.combine_processed_data())

即在很长一段时间内启动异步任务,然后安排一个依赖任务,该任务仅在所有较早的任务完成后才执行。

我看过 chainchord 之类的东西,但它们似乎只有在您可以完全预先构建任务图时才有效。

最佳答案

对于任何感兴趣的人,我最终使用了这个片段:

@app.task(bind=True, max_retries=None)
def wait_for(self, task_id_or_ids):
try:
ready = app.AsyncResult(task_id_or_ids).ready()
except TypeError:
ready = all(app.AsyncResult(task_id).ready()
for task_id in task_id_or_ids)

if not ready:
self.retry(countdown=2**self.request.retries)

然后像这样编写工作流:

task_ids = []
for i in range(N):
task = (generate_data_slowly.si(i) |
process_data.si(i)
)
task_id = task.delay().task_id
task_ids.append(task_id)

final_task = (wait_for(task_ids) |
combine_processed_data.si()
)

final_task.delay()

关于celery - 安排 Celery 任务在其他任务完成后运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41730218/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com