gpt4 book ai didi

django - Celery 使用 app.control.purge() 时运行任务会发生什么?

转载 作者:行者123 更新时间:2023-12-05 01:37:44 25 4
gpt4 key购买 nike

目前我有一批 celery 与 django 一起运行,如下所示:

celery .py:

from __future__ import absolute_import, unicode_literals
import os
import celery
from celery import Celery
from celery.schedules import crontab
import django

load_dotenv(os.path.join(os.path.dirname(os.path.dirname(__file__)), '.env'))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'base.settings')
django.setup()
app = Celery('base')

app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
app.control.purge()
sender.add_periodic_task(30.0, check_loop.s())
recursion_function.delay() #need to use recursive because it need to wait for loop to finish(time can't be predict)
print("setup_periodic_tasks")

@app.task()
def check_loop():
.....
start = database start number
end = database end number
callling apis in a list from id=start to id=end
create objects
update database(start number = end, end number = end + 3)

....


@app.task()
def recursion_function(default_retry_delay=10):
.....
do some looping
....
#when finished, call itself again
recursion_function.apply_async(countdown=30)

我的目标是每当 celery 文件被编辑时它会重新启动所有任务 - 删除尚未执行的排队任务(我这样做是因为 recursion_function 完成后会再次运行它是检查我数据库中表的每条记录的工作,所以我不用担心它会在中途停止)。

check_loop 函数将调用具有分页功能的 api 以返回对象列表,我会将其与表中的记录进行比较,如果匹配则创建另一个模型的新自定义记录

我的问题是,当我清除所有消息时,当前正在运行的任务会中途停止还是会继续运行?因为如果 check_loop 函数在 api 列表中中途停止循环,那么它将再次运行循环,我将创建我不想要的新的重复记录

示例:

check_loop() 的运行任务中,它在中途创建对象(在 api 列表中从元素 id=2 到 id=5),服务器重启 -> 再次运行,现在 check_loop() 从头开始​​运行(在 api 列表上从元素 id=2 到 id=5)并再次从该列表创建对象(我 100% 不想要)

它是这样运行的吗?我只需要确认

编辑:

https://docs.celeryproject.org/en/4.4.1/faq.html#how-do-i-purge-all-waiting-tasks

我添加了 app.control.purge() 因为当我重新启动时 recursion_functionsetup_periodic_tasks 中再次被调用而之前的 recursion_function 来自 recursion_function.apply_async(countdown=30) 也执行所以它自己倍增

最佳答案

, worker 会continue execution of currently running task除非工作人员也重新启动。

此外,Celery Way始终期望任务在并发环境中运行,并考虑以下因素:

  • 有很多任务并发运行
  • 有很多celery worker在执行任务
  • 同样的任务可能会再次运行
  • 同一任务的多个实例可能同时运行
  • 任何任务都可以随时终止

即使您确定在您的环境中只有一个 worker 手动启动/停止并且这些不适用 - 应该以允许这一切发生的方式创建任务。

一些有用的技巧:

  • 使用数据库事务
  • 使用锁定
  • 将长时间运行的任务拆分成较快的任务
  • 如果任务有中间值要保存或者它们很重要(即不可重现,如某些 api 调用)并且它们在下一步中的处理需要时间 - 考虑拆分成几个链式任务

如果您一次只需要运行一个任务实例 - 使用某种锁定 - 在数据库或缓存以便其他人(相同的任务)可以检查并知道该任务正在运行,然后返回或等待前一个任务完成。

recursion_function 也可以是 Periodic Task。作为周期性任务将确保它在每个时间间隔运行,即使前一个任务因任何原因失败(因此无法像在常规非周期性任务中那样再次排队)。通过锁定,您可以确保一次只有一个在运行。


check_loop():

首先,建议将结果在一个事务中保存在数据库中,以确保在数据库中保存/修改所有或不保存任何内容。

您还可以保存一些标记,指示已保存对象的数量/状态,因此以后的任务可以只检查此标记,而不是每个对象。

或者在创建每个元素之前以某种方式检查它是否已经存在于数据库中。

关于django - Celery 使用 app.control.purge() 时运行任务会发生什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60557369/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com