gpt4 book ai didi

python - celery、celerybeat 和 django-celery-beat 可以在运行时动态添加/删除任务而无需重新启动 celerybeat 吗?

转载 作者:太空狗 更新时间:2023-10-29 17:35:07 24 4
gpt4 key购买 nike

我尝试了我能找到的一切,包括:

计算器

How to dynamically add / remove periodic tasks to Celery (celerybeat)

Can celery celerybeat dynamically add/remove tasks in runtime?

Github 问题

How to dynamically add or remove tasks to celerybeat?

我从上面得到的是,如果我只使用 celery 和 celery beat,我必须在添加/删除任务后重新启动 celery beat。但如果我结合 django-celery-beat,我就不必重新启动它。

我关注docs一步一步:

from celery import Celery
from celery.schedules import crontab

app = Celery('tasks')
app.config_from_object('celeryconfig')
app.conf.timezone = 'UTC'

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

# Calls test('world') every 30 seconds
sender.add_periodic_task(30.0, test.s('world'), expires=10)

# Executes every Monday morning at 7:30 a.m.
sender.add_periodic_task(
crontab(hour=7, minute=30, day_of_week=1),
test.s('Happy Mondays!'),
)

@app.task
def test(arg):
print(arg)

我的 celery 配置

BROKER_URL = 'amqp://rabbit'
CELERY_RESULT_BACKEND = 'rpc://rabbit'
CELERY_RESULT_PERSISTENT = True
# CELERY_ACKS_LATE = True
CELERY_DEFAULT_DELIVERY_MODE = 2
CELERY_TASK_RESULT_EXPIRES = 3600
CELERYBEAT_SCHEDULER ="django_celery_beat.schedulers:DatabaseScheduler"

我的 celery 节拍运行命令

celery -A tasks beat -l info -S django

这很好用,任务按预期运行。之后,我写了一个脚本来在运行时添加任务

import django
django.setup()
from tasks import app, setup_periodic_tasks
from django_celery_beat.models import PeriodicTask, CrontabSchedule


crontab = CrontabSchedule.objects.create(
minute='*/1',
hour='*',
day_of_week='*',
)

period = PeriodicTask.objects.create(
name='testfasd',
kwargs={},
crontab=crontab,
task='tasks.test',
)

setup_periodic_tasks(app)

当我查看数据库时,我得到了预期的结果,新记录以及 last_update 字段已更新。celery 中的日志也证明了这一点

[2016-12-20 17:37:21,796: INFO/MainProcess] Writing entries...
[2016-12-20 17:37:21,840: INFO/MainProcess] Scheduler: Sending due task add every 10 (tasks.test)
[2016-12-20 17:37:31,848: INFO/MainProcess] DatabaseScheduler: Schedule changed.
[2016-12-20 17:37:31,851: INFO/MainProcess] Writing entries...
[2016-12-20 17:37:31,930: INFO/MainProcess] Scheduler: Sending due task add every 10 (tasks.test)

我的问题是,虽然 celery beat 知道数据库已经改变,但它仍然发送旧任务而不发送新任务给 worker。有什么想法吗?

更新

我在我的项目中使用 docker,也许它是相关的。

最佳答案

This github issue

[You can't add or delete tasks in celerybeat] currently, you have to restart beat.

没有。为了刷新 celery[beat] 中的任务或任务计时,您必须重新启动 celery[beat] 实例。任务在运行时加载到内存中。为了更改/添加任务,您必须刷新实例。

您可以考虑使用自循环任务,使用自定义时间和条件执行。示例:

from datetime import timedelta
from celery import shared_task

@shared_task
def check_conditions():
# Do some db-level code
if condition:
check_conditions.apply_async(eta=timedelta(hours=6))

我在生产中使用它并且表现良好。

如果您需要重新加载任务,只需以编程方式重新启动 celery[beat]:

@shared_task
def autoreload():
if condition:
execute_shell_code_to_restart_celery()

我没有用过这个,不能保证它的可用性,但理论上应该可以。

This github issue

I have to reload beat to get this changes updated on worker ... using django-celery-beat ... This problem is still present on 4.0.2 and on master, tested [on December 21st, 2016].

关于python - celery、celerybeat 和 django-celery-beat 可以在运行时动态添加/删除任务而无需重新启动 celerybeat 吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41240322/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com