gpt4 book ai didi

django - Celery+Docker+Django -- 让任务工作

转载 作者:行者123 更新时间:2023-12-05 05:25:02 26 4
gpt4 key购买 nike

过去一周我一直在尝试学习 Celery,并将其添加到我使用 Django 和 Docker-Compose 的项目中。我很难理解如何让它工作;我的问题是在使用任务时我似乎无法上传到我的数据库来工作。上传功能 insertIntoDatabase 之前在没有任何 Celery 参与的情况下工作正常,但现在上传不起作用。事实上,当我尝试上传时,我的网站太快告诉我上传成功了,但实际上什么都没有上传。

服务器使用 docker-compose up 启动,它将进行迁移、执行迁移、收集静态文件、更新需求,然后启动服务器。这一切都是使用 pavement.py 完成的; Dockerfile 中的命令是 CMD paver docker_run。 Celery worker 在任何时候都不会明确启动;我应该这样做吗?如果是,怎么办?

这是我在 views.py 中调用上传函数的方式:

insertIntoDatabase.delay(datapoints, user, description)

上传函数在名为databaseinserter.py 的文件中定义。以下装饰器用于 insertIntoDatabase:

@shared_task(bind=True, name="database_insert", base=DBTask)

下面是 celery.pyDBTask 类的定义:

class DBTask(Task):
abstract = True

def on_failure(self, exc, *args, **kwargs):
raise exc

我不太确定要为 tasks.py 写什么。这是一位前同事在我从他离开的地方接起之前给我留下的东西:

from celery.decorators import task
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@task(name="database_insert")
def database_insert(data):

下面是我用来配置 Celery 的设置(settings.py):

BROKER_TRANSPORT = 'redis'
_REDIS_LOCATION = 'redis://{}:{}'.format(os.environ.get("REDIS_PORT_6379_TCP_ADDR"), os.environ.get("REDIS_PORT_6379_TCP_PORT"))
BROKER_URL = _REDIS_LOCATION + '/0'
CELERY_RESULT_BACKEND = _REDIS_LOCATION + '/1'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = "UTC"

现在,我猜 tasks.py 中的 database_insert 不应该是空的,但是那里应该放什么呢?此外,似乎 tasks.py 中的任何事情都没有发生——当我添加一些日志语句以查看 tasks.py 是否至少正在运行时,什么都没有实际上最终被记录下来,让我觉得 tasks.py 甚至没有运行。如何正确地将我的上传功能变成任务?

最佳答案

我认为你离实现这个目标不远了。

首先,我建议您尽量将 Celery 任务和业务逻辑分开。因此,例如,在 insertIntoDatabase 函数中包含将数据插入 DB 的业务逻辑,然后单独创建一个 Celery 任务,可能名称为 insert_into_db_task,它将您的 args 作为纯 python 对象(重要)并使用这些 args 调用上述 insertIntoDatabase 函数以实际完成数据库插入。

该示例的代码可能如下所示:

my_app/tasks/insert_into_db.py

from celery.decorators import task
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@task()
def insert_into_db_task(datapoints, user, description):
from my_app.services import insertIntoDatabase
insertIntoDatabase(datapoints, user, description)

my_app/services/insertIntoDatabase.py

def insertIntoDatabase(datapoints, user, description):
"""Note that this function is not a task, by design"""

# do db insertion stuff

my_app/views/insert_view.py

from my_app.tasks import insert_into_db_task

def simple_insert_view_func(request, args, kwargs):
# start handling request, define datapoints, user, description

# next line creates the **task** which will later do the db insertion
insert_into_db_task.delay(datapoints, user, description)
return Response(201)

我所暗示的应用程序结构正是我会如何做的,不是必需的。另请注意,您可以直接使用 @task() 并且不为其定义任何参数。可能会为您简化事情。

这有帮助吗?我喜欢让我的任务轻松自如。他们大多只是做防 SCSS (例如,确保涉及的对象存在于数据库中),调整任务失败时会发生什么(稍后重试?中止任务?等),日志记录,否则他们执行位于其他地方的业务逻辑。

此外,如果不是很明显,您确实需要在某处运行 celery,以便有工作人员实际处理您的 View 代码正在创建的任务。如果您不在某处运行 celery,那么您的任务将堆积在队列中,永远不会得到处理(因此您的数据库插入永远不会发生)。

关于django - Celery+Docker+Django -- 让任务工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32683844/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com