gpt4 book ai didi

python - Celery 在使用 Django 数据库执行之前撤销任务

转载 作者:太空宇宙 更新时间:2023-11-04 10:33:20 25 4
gpt4 key购买 nike

出于并发原因,我使用 Django 数据库而不是 RabbitMQ。

但我无法解决任务执行前撤销的问题。

我找到了一些关于这个问题的答案,但它们似乎并不完整,或者我得不到足够的帮助。

我如何使用模型扩展 celery 任务表,添加一个 bool 字段(已撤销)以在我不想执行任务时进行设置?

谢谢。

最佳答案

由于 Celery 通过 ID 跟踪任务,您真正需要的只是能够分辨出哪些 ID 已被取消。与其修改 kombu 内部结构,不如创建自己的表(或 memcached 等)来跟踪已取消的 ID,然后检查当前可取消任务的 ID 是否在其中.

这是支持远程 revoke 命令的传输在内部执行的操作:

All worker nodes keeps a memory of revoked task ids, either in-memory or persistent on disk (see Persistent revokes). (from Celery docs)

当你使用 django 传输时,你有责任自己做这件事。在这种情况下,由每个任务检查它是否已被取消。

因此您的任务的基本形式(添加日志记录代替实际操作)变为:

from celery import shared_task
from celery.exceptions import Ignore
from celery.utils.log import get_task_logger
from .models import task_canceled
logger = get_task_logger(__name__)

@shared_task
def my_task():
if task_canceled(my_task.request.id):
raise Ignore
logger.info("Doing my stuff")

您可以通过各种方式扩展和改进它,例如通过创建一个基本的 CancelableTask 类,就像您链接到的其他答案之一一样,但这是基本形式。您现在缺少的是模型和检查它的函数。

请注意,在这种情况下,ID 将是一个字符串 ID,如 a5644f08-7d30-43ff-a61e-81c165ad9e19不是整数。您的模型可以像这样简单:

from django.db import models

class CanceledTask(models.Model):
task_id = models.CharField(max_length=200)

def cancel_task(request_id):
CanceledTask.objects.create(task_id=request_id)

def task_canceled(request_id):
return CanceledTask.objects.filter(task_id=request_id).exists()

您现在可以在执行以下操作时通过查看 celery 服务的调试日志来检查行为:

my_task.delay()
models.cancel_task(my_task.delay())

关于python - Celery 在使用 Django 数据库执行之前撤销任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25046200/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com