gpt4 book ai didi

python - 如何向 Celery (celerybeat) 动态添加/删除周期性任务

转载 作者:IT老高 更新时间:2023-10-28 21:35:05 34 4
gpt4 key购买 nike

如果我有如下定义的函数:

def add(x,y):
return x+y

有没有办法将此函数动态添加为 celery PeriodicTask 并在运行时启动它?我希望能够做类似(伪代码)的事情:

some_unique_task_id = celery.beat.schedule_task(add, run_every=crontab(minute="*/30"))
celery.beat.start(some_unique_task_id)

我还想使用(伪代码)之类的东西动态停止或删除该任务:

celery.beat.remove_task(some_unique_task_id)

celery.beat.stop(some_unique_task_id)

仅供引用,我没有使用 djcelery,它可以让您通过 django 管理员管理周期性任务。

最佳答案

此问题已在 google groups 上得到解答.

我不是作者,所有功劳归功于 Jean Mark

Here's a proper solution for this. Confirmed working, In my scenario, I sub-classed Periodic Task and created a model out of it since I can add other fields to the model as I need and also so I could add the "terminate" method. You have to set the periodic task's enabled property to False and save it before you delete it. The whole subclassing is not a must, the schedule_every method is the one that really does the work. When you're ready to terminate you task (if you didn't subclass it) you can just use PeriodicTask.objects.filter(name=...) to search for your task, disable it, then delete it.

Hope this helps!

from djcelery.models import PeriodicTask, IntervalSchedule
from datetime import datetime

class TaskScheduler(models.Model):

periodic_task = models.ForeignKey(PeriodicTask)

@staticmethod
def schedule_every(task_name, period, every, args=None, kwargs=None):
""" schedules a task by name every "every" "period". So an example call would be:
TaskScheduler('mycustomtask', 'seconds', 30, [1,2,3])
that would schedule your custom task to run every 30 seconds with the arguments 1,2 and 3 passed to the actual task.
"""
permissible_periods = ['days', 'hours', 'minutes', 'seconds']
if period not in permissible_periods:
raise Exception('Invalid period specified')
# create the periodic task and the interval
ptask_name = "%s_%s" % (task_name, datetime.datetime.now()) # create some name for the period task
interval_schedules = IntervalSchedule.objects.filter(period=period, every=every)
if interval_schedules: # just check if interval schedules exist like that already and reuse em
interval_schedule = interval_schedules[0]
else: # create a brand new interval schedule
interval_schedule = IntervalSchedule()
interval_schedule.every = every # should check to make sure this is a positive int
interval_schedule.period = period
interval_schedule.save()
ptask = PeriodicTask(name=ptask_name, task=task_name, interval=interval_schedule)
if args:
ptask.args = args
if kwargs:
ptask.kwargs = kwargs
ptask.save()
return TaskScheduler.objects.create(periodic_task=ptask)

def stop(self):
"""pauses the task"""
ptask = self.periodic_task
ptask.enabled = False
ptask.save()

def start(self):
"""starts the task"""
ptask = self.periodic_task
ptask.enabled = True
ptask.save()

def terminate(self):
self.stop()
ptask = self.periodic_task
self.delete()
ptask.delete()

关于python - 如何向 Celery (celerybeat) 动态添加/删除周期性任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10194975/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com