gpt4 book ai didi

python - 阻止 celery 任务开始,直到完成具有相似参数的不同任务

转载 作者:行者123 更新时间:2023-12-03 14:46:28 26 4
gpt4 key购买 nike

假设我有一个 celery 任务,它需要两个参数:X(a,b)我需要使用以下两条规则来实现自定义并发逻辑:

  • X 的实例如果 a 的值不同,则可以同时运行.也就是说,如果 X(a=1,b=10)正在运行时 X(a=2,b=20)被添加到队列中,然后后者从队列中拉出并立即执行。
  • X 的实例如果 a 的值相同,则不能同时运行.也就是说,如果 X(a=1,b=10)正在运行时 X(a=1,b=20)被添加到队列中,那么后者必须在队列中等待,直到前者完成。

  • 规则 #1 通过设置 worker_concurrency>1 与 celery 一起开箱即用。 ( docs)。规则 #2 是一个棘手的规则。
    分布式任务锁定,如 docs 中所述在 this blog , 是一种让我接近我需要的方法。甚至还有一些库可以为您实现它( celery-singleton)。但是,回顾规则#2,这种方法似乎可以防止第二个任务在第一个任务完成之前排队。我需要它排队,直到第一个任务完成才在工作人员上执行。
    有没有办法实现这个? This SO question问了一个类似的问题,但到目前为止没有答案。

    最佳答案

    这似乎是使用 redis 的好例子。和绑定(bind) celery 任务。如果你还没有这样做,你也可以使用 redis 作为你的 celery 代理,如果你需要的话,也可以作为缓存层。这真是一把瑞士军刀。 Deploying redis也很简单。我强烈鼓励任何人更熟悉它。这是一个很好的工具,可以放在一个人的工具箱中。
    我会稍微改变一下这个例子,因为我总是对单字符函数和变量感到困惑。

    # Think of this as X(a,b) from the question
    @task
    def add(num1, num2):
    return num1 + num2
    然后我们可以升级 add看起来更像这样:
    # "bind" the task so we have access to all the Task base class functionality
    # via "self".
    # https://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.retry
    @task(bind=True)
    def add(self, num1, num2):
    if does_running_task_exist_with(num1):
    # requeue. Please visit the docs for "retry" mentioned above.
    # There are also max_retries and some other nice things.
    # Try again in 10s
    self.retry(countdown=10)
    return
    return num1 + num2
    我们的 does_running_task_exist_with然后辅助函数将使用 redis 集。像所有 Set 实现一样,它们保证唯一性并且检查成员是否存在很快。
    # Using https://github.com/andymccurdy/redis-py
    import redis

    def does_running_task_exist_with(some_number):
    # Connect to redis.
    # Using database number 2. You might be using db 0 for celery brokerage,
    # and db 1 for celery result storage. Using a separate DB is just nice
    # for isolation. Redis has up to 16.
    # Connects to localhost by default.
    redis_conn = redis.StrictRedis(db=2)
    # we try adding this number to the Set of currently processing numbers
    # https://redis.io/commands/sadd
    # Return value: the number of elements that were added to the set,
    # not including all the elements already present into the set.
    members_added = redis_conn.sadd("manager_task_args", str(some_number))
    # Or shortcut it as "return members_added == 0". This here is
    # more expressive though
    if members_added == 0:
    return True
    return False
    好的。现在跟踪和决策已经到位。缺少的一件重要事情是:一旦 add任务完成,我们需要删除 num1从 redis 集中。让我们稍微调整一下功能。
    import redis

    @task(bind=True)
    def add(self, num1, num2):
    if does_running_task_exist_with(num1):
    self.retry(countdown=10)
    return
    # Do actual work…
    result = num1 + num2
    # Cleanup
    redis_conn = redis.StrictRedis(db=2)
    redis_conn.srem("manager_task_args", str(num1))
    return result
    但是如果事情出错了怎么办?如果添加失败怎么办?然后我们的 num1永远不会从集合中删除,我们的队列开始变得越来越长。我们不希望那样。你可以在这里做两件事:要么创建 a class-based task with an on_failure method , 或将其包装在 try-except-finally 中。我们将走 try-finally 路线,因为在这种情况下更容易遵循:
    import redis

    @task(bind=True)
    def add(self, num1, num2):
    if does_running_task_exist_with(num1):
    self.retry(countdown=10)
    return
    try:
    result = num1 + num2
    finally:
    redis_conn = redis.StrictRedis(db=2)
    redis_conn.srem("manager_task_args", str(num1))
    return result
    那应该这样做。请注意,您可能还想查看 redis connection pooling如果你有大量的任务。

    关于python - 阻止 celery 任务开始,直到完成具有相似参数的不同任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66180268/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com