gpt4 book ai didi

python - celery 不注册任务

转载 作者:太空宇宙 更新时间:2023-11-04 02:37:04 26 4
gpt4 key购买 nike

你好!我刚开始在 Django 中使用 Celery。我有一项任务需要定期执行。在管理界面中,我可以在名为“任务(已注册):”的下拉列表中看到我的任务。但是当 Celery Beat 尝试执行它时,会抛出 NotRegistered 异常。

Python 3.5.2、Django 1.11.4、Celery 4.1、django-celery-beat 1.1.0、django-celery-results 1.0.1

settings.py中与celery相关的部分:

CELERY_BROKER_URL = 'amqp://user:*****@192.168.X.X/proj'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_RESULT_BACKEND = 'django-db'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/Moscow'

celery.py 和 proj/__init__.py 与文档示例相同。

项目/celery.py:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

app.config_from_object(settings, namespace='CELERY')

app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))

项目/__init__.py:

from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app

__all__ = ['celery_app']

任务.py:

from celery import shared_task
from core.backend.files import SFTP

@shared_task
def load_files_from_sftp():
ftp = SFTP()
ftp.get_files()

我得到以下 json 结果:{"exc_message": "'core.tasks.load_files_from_sftp'", "exc_type": "NotRegistered"}

如果我尝试在 shell 中使用 celery.task.control.inspect() ,那里只有 debug_task() 。卡住了! (如果有任何帮助,我将不胜感激。

最佳答案

正如@MuhammadShoaib 所写,这只是一件事:

from django.apps import apps 

app.config_from_object(settings)
app.autodiscover_tasks(lambda: [n.name for n in apps.get_app_configs()])

代替

app.autodiscover_tasks()

为什么这不在文档中?...

关于python - celery 不注册任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47630459/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com