gpt4 book ai didi

python - 如何跨多个 celeryd 进程跟踪已撤销的任务

转载 作者:太空宇宙 更新时间:2023-11-03 19:20:02 25 4
gpt4 key购买 nike

我有一个提醒类型的应用程序,它使用“eta”参数在 celery 中安排任务。如果提醒对象中的参数发生变化(例如提醒时间),那么我会撤销之前发送的任务并排队一个新任务。

我想知道是否有任何好的方法可以在 celeryd 重新启动时跟踪已撤销的任务。我希望能够动态地向上/向下扩展 celeryd 进程,并且似乎在发送撤销命令后启动的任何 celeryd 进程仍将执行该任务。

一种方法是保留已撤销任务 ID 的列表,但这种方法会导致列表任意增长。修剪此列表需要保证任务不再位于 RabbitMQ 队列中,这似乎是不可能的。

我还尝试为每个 celeryd 工作人员使用共享的 --statedb 文件,但似乎 statesdb 文件仅在工作人员终止时更新,因此不适合我想要完成的任务。

提前致谢!

最佳答案

有趣的问题,我认为使用广播命令应该很容易解决。如果当一个新的工作人员启动时,它要求所有其他工作人员转储其已撤销的工作人员给新 worker 的任务。添加两个新的远程控制命令,您可以使用 @Panel.register,

轻松添加新命令

模块control.py:

from celery.worker import state
from celery.worker.control import Panel

@Panel.register
def bulk_revoke(panel, ids):
state.revoked.update(ids)

@Panel.register
def broadcast_revokes(panel, destination):
panel.app.control.broadcast("bulk_revoke", arguments={
"ids": list(state.revoked)},
destination=destination)

将其添加到 CELERY_IMPORTS:

CELERY_IMPORTS = ("control", )

现在唯一缺少的问题是连接它,以便新的工作人员启动时触发 broadcast_revokes。我想你可以使用 worker_ready信号:

from celery import current_app as celery
from celery.signals import worker_ready

def request_revokes_at_startup(sender=None, **kwargs):
celery.control.broadcast("broadcast_revokes",
destination=sender.hostname)

关于python - 如何跨多个 celeryd 进程跟踪已撤销的任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10062999/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com