gpt4 book ai didi

python - celeryd 和 celerybeat - "Received unregistered task of type..."

转载 作者:行者123 更新时间:2023-12-01 05:42:39 28 4
gpt4 key购买 nike

好吧,伙计们,这让我发疯。我已经为此工作了一整天,但无法让它发挥作用!
我的 celery 项目结构是这样的:

# celery.py

from celery.schedules import crontab
from celery import Celery

celery = Celery('scheduler.celery',
include=['scheduler.tasks'])

celery.config_from_object('celeryconfig')

还有:

# tasks.py

from scheduler.celery import celery

@celery.task
def test():
do_something()

还有:

# celeryconfig.py

from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
'test-cron': {
'task': 'tasks.test',
'schedule': crontab(minute='*/1'),
},
}
# CELERY_IMPORTS = ('tasks', )
BROKER_URL = 'redis://localhost:6379/0'

所有文件都位于 projects/scheduler/ 下文件夹。
当我启动 celeryd服务我可以看到它正在运行并连接到我的代理,但是当我启动 celerybeat 时服务,我可以在日志中看到消息:Received unregistered task of type 'tasks.test' .
如果我取消注释 CELERY_IMPORTS常数(正如SO中许多答案所建议的那样),celeryd服务甚至无法启动!实际上,它输出 OK但使用 ps ef | grep celery我可以看到它没有运行。

我的守护进程conf文件如下所示:

# Name of nodes to start
CELERYD_NODES="w1"

# Where to chdir at start.
CELERYD_CHDIR="/home/me/projects/scheduler/"

# Extra arguments to celeryd
CELERYD_OPTS="--time-limit=300 --concurrency=4"

# %n will be replaced with the nodename.
CELERYD_LOG_FILE="/var/log/celery/%n.log"
CELERYD_PID_FILE="/var/run/celery/%n.pid"

# Workers should run as an unprivileged user.
CELERYD_USER="celery"
CELERYD_GROUP="celery"

# Extra arguments to celerybeat
CELERYBEAT_OPTS="--schedule=/var/run/celerybeat-schedule"

非常感谢任何帮助。

最佳答案

如果您没有 scheduler 模块并从模块根运行 celery:

runner.py:

from celery_test import celery

if __name__ == '__main__':
celery.start()

celery.py重命名为celery_test.py:

from celery import Celery

celery = Celery('scheduler.celery',
include=['tasks'])

celery.config_from_object('celeryconfig')

celeryconfig.py:

from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
'test-cron': {
'task': 'tasks.test',
'schedule': crontab(minute='*/1'),
},
}

BROKER_URL = 'redis://localhost:6379/0'

tasks.py,修复导入:

from celery_test import celery

@celery.task
def test():
do_something()

您必须小心导入并添加runner.py,因为它导入celery_test,然后导入tasks,其中导入再次 celery_test

<小时/>

如果您有 scheduler 模块并从模块根运行 celery:

__init__.py 空。

runner.py:

from scheduler.celery_test import celery

if __name__ == '__main__':
celery.start()

celery.py重命名为celery_test.py:

from celery import Celery

celery = Celery('scheduler.celery',
include=['scheduler.tasks'])

celery.config_from_object('celeryconfig')

celeryconfig.py,修复默认任务名称:

from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
'test-cron': {
'task': 'scheduler.tasks.test',
'schedule': crontab(minute='*/1'),
},
}

BROKER_URL = 'redis://localhost:6379/0'

tasks.py:

from scheduler.celery_test import celery

@celery.task
def test():
do_something()

关于python - celeryd 和 celerybeat - "Received unregistered task of type...",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17143292/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com