gpt4 book ai didi

python - Celery:基于自定义基类/子类的任务未显示在 app.tasks 下

转载 作者:行者123 更新时间:2023-12-01 03:30:28 25 4
gpt4 key购买 nike

我正在尝试创建一些 celery 任务作为类(class),但遇到了一些困难。这些类(class)是:

class BaseCeleryTask(app.Task):

def is_complete(self):
""" default method for checking if celery task has completed. """
# simply return result (since by default tasks return boolean indicating completion)

try:
return self.result
except AttributeError:
logger.error('Result not defined. Make sure task has run!')
return False


class MacroReportTask(BaseCeleryTask):

def run(self, params):
""" Override the default run method with signal factory run"""
# hold on to the factory
process = MacroCountryReport(params)
self.result = process.run()
return self.result

但是当我初始化应用程序并检查 app.tasks (或运行工作程序)时,应用程序的注册表中似乎没有上述任务。其他基于函数的任务(使用 app.task() 装饰器)似乎注册得很好。

我将上述任务运行为:

process = SignalFactoryTask()
process.delay(params)

Celery 工作线程错误并显示以下消息:收到类型为 None 的未注册任务

我认为我遇到的问题是:如何像处理基于常规函数的任务一样将自定义类添加到任务注册表中?

最佳答案

遇到了完全相同的问题,花了几个小时才找到解决方案,因为我 90% 确定这是一个错误。在类任务中,尝试以下操作

class BaseCeleryTask(app.Task):

def __init__(self):
self.name = "[modulename].BaseCeleryTask"

class MacroReportTask(app.Task):

def __init__(self):
self.name = "[modulename].MacroReportTask"

似乎在应用程序中注册它仍然存在一个错误,即名称不会自动配置。让我知道这是否有效。

关于python - Celery:基于自定义基类/子类的任务未显示在 app.tasks 下,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40988688/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com