gpt4 book ai didi

python - Django Celery AbortableTask 使用

转载 作者:行者123 更新时间:2023-12-01 06:15:06 25 4
gpt4 key购买 nike

我正在尝试使用 AbortableTask Celery 的功能,但文档示例似乎不适合我。给出的例子是:

from celery.contrib.abortable import AbortableTask

def MyLongRunningTask(AbortableTask):

def run(self, **kwargs):
logger = self.get_logger(**kwargs)
results = []
for x in xrange(100):
# Check after every 5 loops..
if x % 5 == 0: # alternatively, check when some timer is due
if self.is_aborted(**kwargs):
# Respect the aborted status and terminate
# gracefully
logger.warning("Task aborted.")
return None
y = do_something_expensive(x)
results.append(y)
logger.info("Task finished.")
return results

from myproject.tasks import MyLongRunningTask

def myview(request):

async_result = MyLongRunningTask.delay()
# async_result is of type AbortableAsyncResult

# After 10 seconds, abort the task
time.sleep(10)
async_result.abort()

...

但是,我收到错误:

TypeError: MyLongRunningTask() takes exactly 1 argument (0 given)

我做错了什么?

最佳答案

只是一个猜测,但我认为应该是

class MyLongRunningTask(AbortableTask)

而不是

def MyLongRunningTask(AbortableTask)

关于python - Django Celery AbortableTask 使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/3659561/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com