gpt4 book ai didi

django - kombu utils 对象 py line 42 in __ get __ return obj __ dict __[ self __ name __] keyerror data

转载 作者:行者123 更新时间:2023-12-02 11:21:21 27 4
gpt4 key购买 nike

设置.py

CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Makassar'
# Other Celery settings
CELERY_BEAT_SCHEDULE = {
'add_task': {
'add_task': 'data_loader.tasks.add_task',
'schedule': crontab(minute=55, hour=13),

}
# 'task-number-two': {
# 'task': 'data_loader.tasks.demo',
# 'schedule': crontab(minute=2, hour='18'),
# }
}

celery .py
………………
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'resolution_validator.settings')

app = Celery('resolution_validator')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))

任务.py
………………
from __future__ import absolute_import, unicode_literals

from celery.task import task


@task()
def add_task():
print("hello world")
a = 10
return True

初始化 .py
……………………
from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

我也收到以下错误

"RecursionError: maximum recursion depth exceeded" with the KeyError.



我正在使用 Django==2.0,celery==4.2.0,python==3.5.2
无法得到解决方案。

最佳答案

你的tasks.py有问题。您应该使用 @shared_task装饰器而不是 @task装饰师

from __future__ import absolute_import, unicode_literals

from celery import shared_task


@shared_task
def add_task():
print("hello world")
a = 10
return True

关于django - kombu utils 对象 py line 42 in __ get __ return obj __ dict __[ self __ name __] keyerror data,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52054031/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com