gpt4 book ai didi

python - Celery:如何等待嵌套任务在组中完成

转载 作者:行者123 更新时间:2023-12-03 22:29:38 40 4
gpt4 key购买 nike

我需要使用一些参数向 API 发出请求,并一一处理来自请求的数据。但有时响应数据是分页的,这意味着我需要使用相同的参数发出额外的请求。
celery group允许我们一个一个地运行任务,但是如果任务产生子任务,子任务可以产生更多的子任务……,
有没有办法在运行组中的下一个任务之前等待所有子任务完成?或者 celery 给我们更好的方法来解决我的任务?

def some_api_call(start_date=None, end_date=None, token_value=None):
pass


@celery_app.task(name='run_task', max_retries=None, bind=True)
def run_task(self):
group_items = [
task1.s('2020-01-02', '2020-01-03'),
task1.s('2020-01-05', '2020-01-01'),
task1.s('2020-01-010', '2020-01-04'),
]
group(group_items)()

@celery_app.task(name='task1', max_retries=None, bind=True)
def task1(self, start_date=None, end_date=None, token_value=None, *args, **kwargs):
res = some_api_call(start_date, end_date, token_value)
if res['token_value']:
# NEXT ELEMENT IN THE GROUP SHOULD WAIT UNTIL NESTED CHILD TASKS DONE
task1.delay(token_value=token_value)
经纪人- Redis .
我可能的解决方案[伪代码]:
  • 等到父任务中的子任务完成
  •     res = task1.delay(token_value=token_value) 

    res.get()
    解决方案不好 - 我们挡住了胎面。
    不确定 celery 是否有等待替代品。
  • 用户 task.retry()检查子任务是否完成。
  •     taskid = task1.delay(token_value=token_value)
    if AsyncResult(taskid).state != "successes":

    self.retry()
    所以我们将重试父任务,直到子任务完成并且不要阻塞线程。
    但就像在第一个解决方案中一样:父任务处理了它的数据,但状态将是重试。

    最佳答案

    如果您需要等待 Group 中的任务完成,您应该使用 Chord原始。此外,如果您需要按顺序(一个接一个)执行任务,请使用 Chain 原语。 Chord 基本上是一个 Group 的链,还有一个最终的任务……

    关于python - Celery:如何等待嵌套任务在组中完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62495390/

    40 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com