gpt4 book ai didi

python - 如何通过任务名称对 Celery 任务进行速率限制?

转载 作者:行者123 更新时间:2023-12-01 02:59:03 26 4
gpt4 key购买 nike

我正在使用 Celery 处理来自 Django 应用程序的异步任务。大多数任务都很短,只需几秒钟即可完成,但我有一个任务可能需要几个小时。

由于我的服务器上的处理限制,Celery 配置为一次仅运行 2 个任务。这意味着如果有人启动其中两个长时间运行的任务,它会有效地阻止所有其他 Celery 处理站点几个小时,这是非常糟糕的。

是否有任何方法可以配置 Celery,使其一次只处理一种类型的任务,且一次不超过一个?像这样的东西:

@task(max_running_instances=1)
def my_really_long_task():
for i in range(1000000000):
time.sleep(6000)

请注意,我不想取消 my_really_long_task 的所有其他启动。我只是不希望它们立即开始,而只在所有其他同名任务完成后才开始。

由于 Celery 似乎不支持这一点,我当前的 hacky 解决方案是查询任务中的其他任务,如果我们找到其他正在运行的实例,则重新安排自己稍后运行,例如

from celery.task.control import inspect

def get_all_active_celery_task_names(ignore_id=None):
"""
Returns Celery task names for all running tasks.
"""
i = inspect()
task_names = defaultdict(int) # {name: count}
if i:
active = i.active()
if active is not None:
for worker_name, tasks in i.active().iteritems():
for task in tasks:
if ignore_id and task['id'] == ignore_id:
continue
task_names[task['name']] += 1
return task_names

@task
def my_really_long_task():

all_names = get_all_active_celery_task_names()
if 'my_really_long_task' in all_names:
my_really_long_task.retry(max_retries=100, countdown=random.randint(10, 300))
return

for i in range(1000000000):
time.sleep(6000)

有更好的方法吗?

我知道其他一些黑客解决方案,例如 this ,但是设置单独的 memcache 服务器来跟踪任务唯一性甚至更不可靠,而且比我上面使用的方法更复杂。

最佳答案

另一种解决方案是将 my_really_long_task 放入单独的队列中。

 my_really_long_task.apply_async(*args, queue='foo')

然后启动一个并发数为1的worker来消费这些任务,这样一次只执行1个任务。

celery -A foo worker -l info -Q foo

关于python - 如何通过任务名称对 Celery 任务进行速率限制?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43986085/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com