gpt4 book ai didi

django - 如何使用 celery 队列中的倒数计时器刷新任务

转载 作者:行者123 更新时间:2023-12-01 03:46:30 24 4
gpt4 key购买 nike

我的 Celery 队列有数百个带有倒计时的任务,它们将在接下来的几个小时内触发。有没有办法让这些任务立即运行,以便有效地刷新队列?

我目前正在计划升级我们的服务器,我想确保升级完成时没有后台任务在运行。如果我必须等待这些倒计时,那没关系,但我宁愿强制执行任务。

另一种选择是暂停队列的处理,直到升级完成,但刷新似乎是一个更好的选择。

编辑 :我已经找到了如何查找已安排的任务列表:

from celery.task.control import inspect
i = inspect()
tasks = i.scheduled()

现在我只需要弄清楚如何强制执行它们。

最佳答案

好的,我很确定我已经大致了解了如何做到这一点。我正在将此答案作为维基并记下我的笔记,以防有人想在这里调整一般过程。

大体思路是这样的:

  • 停止向队列添加新项目。
  • 确定任何排队的任务。
  • 使用 result.revoke() 撤销所有这些任务.
  • 使用一些保存的状态重新启动这些任务。

  • 请注意,这不支持添加 eta一旦您将它们重新排队,就可以将它们添加到项目中,因为这可能是特定于实现的。

    因此,要找出排队的任务,您可以执行以下操作:
    from celery.task.control import inspect
    i = inspect()
    scheduled_tasks = i.scheduled()

    它返回一个字典,如下所示:
    {u'w1.courtlistener.com': [{u'eta': 1414435210.198864,
    u'priority': 6,
    u'request': {u'acknowledged': False,
    u'args': u'(2745724,)',
    u'delivery_info': {u'exchange': u'celery',
    u'priority': None,
    u'routing_key': u'celery'},
    u'hostname': u'w1.courtlistener.com',
    u'id': u'99bc8650-3be1-4d24-81d6-a882d77a8b25',
    u'kwargs': u'{}',
    u'name': u'citations.tasks.update_document_by_id',
    u'time_start': None,
    u'worker_pid': None}}]}

    下一步是撤销所有这些任务,例如:
    from celery.task.control import revoke
    with open('revoked_tasks.csv', 'w') as f:
    for worker, tasks in scheduled_tasks.iteritems():
    print "Now processing worker: %s" % worker
    for task in tasks:
    print "Now revoking task: %s. %s with args: %s and kwargs: %s" % \
    (task['request']['id'], task['request']['name'], task['request']['args'], task['request']['kwargs'])
    f.write('%s|%s|%s|%s|%s\n' % (worker, task['request']['name'], task['request']['id'], task['request']['args'], task['request']['kwargs']))
    revoke(task['request']['id'], terminate=True)

    然后,最后,像往常一样重新运行任务,从 CSV 文件加载它们:
    with open('revoked_tasks', 'r') as f:
    for line in f:
    worker, command, id, args, kwargs = line.split("|")
    # Impost task here, something like...
    package, module = command.rsplit('.', 1)
    mod = __import__(package, globals(), locals(), [module])

    # Run the commands, something like...
    mod.__get_attribute__(module).delay(args*, kwargs**)

    关于django - 如何使用 celery 队列中的倒数计时器刷新任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26535719/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com