gpt4 book ai didi

python / celery : how can I kill subtasks when killing a parent task?

转载 作者:行者123 更新时间:2023-11-28 18:59:51 27 4
gpt4 key购买 nike

上下文

我创建了一个调用 celery 任务的 Django 应用程序,该任务又会生成其他任务并等待它们完成。

工作流程如下:

1) 主要的python/django代码在后台启动一个celery任务

2) celery 任务处理一些代码,然后启动一组不同的 celery 任务并等待它们准备就绪

3) 该组的每个任务然后以相同的方式产生另一组子任务并等待它们完成

它运行良好(虽然我是初学者并且可能实现得不好)但现在我希望能够终止每个子进程,如果我杀死开始时开始的主要 celery 任务。

到目前为止我有什么

我使用生成多个子任务的简单父任务重新创建了这种情况,并且我修改了 celery Task 类的“on_failure”方法以在失败时杀死它的子任务。

Tasks.py

from celery import Celery, group,Task, result
from celery.signals import task_revoked
import time
from pprint import pprint
application = Celery('tasks',backend='amqp://',broker='amqp://guest@localhost//')


class MyTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
print(self.AsyncResult(task_id).children[0].revoke(terminate=True,signal='SIGTERM'))
print('{0!r} failed: {1!r}'.format(task_id, exc))

@application.task(base=MyTask)
def childTask():
while True:
time.sleep(10)
print("Message de la tache enfant")
continue

@application.task(base=MyTask)
def parentTask(pra_id = None):
child_tasks = []
print("Lancement tache mère")
child_tasks.append(childTask.s())
child_tasks.append(childTask.s())
child_tasks.append(childTask.s())
tasks = group(child_tasks)
tasks.apply_async()

time.sleep(15)
raise KeyError

main.py

from tasks import parentTask

parent1 = parentTask.delay(pra_id = 10)
parent2 = parentTask.delay(pra_id = 20)

当代码引发错误时,父任务及其子任务也被成功杀死,这就是我想要的。

我需要什么

我需要能够从我的 Django 应用程序中手动终止我的父任务。

这是通过检查 celery worker 并通过搜索它的参数找到我的任务来完成的,这是成功完成的,但是,当我找到它后手动撤销 celery 任务时,它不会终止由此产生的子任务任务,这就是我需要的。

到目前为止我尝试了什么

我尝试创建一个由“撤销”信号触发的函数

(http://docs.celeryproject.org/en/latest/userguide/signals.html#task-revoked)

将在任务被撤销时执行。

捕获信号有效(我能够在撤销任务时执行一些代码)但我无法使用与上述“On_failure”方法相同的代码)来终止子任务。

问题

发送到该函数的 Request 对象确实包含我的父任务,但是当它应该包含一个包含子任务的 GroupResult 对象时,该类的“children”属性为空。

最佳答案

不确定这是否对您有帮助,但我发现在创建子任务 ID 时将每个子任务 ID 存储在 Redis 或某些数据库中,并将它们与 pipeline_id 相关联。然后如果我需要终止父任务,我也可以终止存储在列表中的所有子任务。

result.revoke(terminate=True)

subtask_results = get_subtask_status(pipeline_id) #Custom Function

for subtask_result in subtask_results:
subtask_result.revoke(terminate=True)

关于 python / celery : how can I kill subtasks when killing a parent task?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53848059/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com