gpt4 book ai didi

python - django + celery : disable prefetch for one worker, 有错误吗?

转载 作者:行者123 更新时间:2023-12-01 07:09:30 29 4
gpt4 key购买 nike

我有一个带有 celery 的 Django 项目

由于 RAM 限制,我只能运行两个工作进程。

我的任务既有“慢”又有“快”。快速任务应尽快执行。短时间内(0.1 秒 - 3 秒)可能有许多快速任务,因此理想情况下两个 CPU 都应该处理它们。

慢速任务可能会运行几分钟,但结果可能会延迟。

慢任务出现的频率较低,但可能会出现 2 或 3 个任务同时排队的情况。

我的想法是拥有一个:

  • 1 个 celery 工作线程 W1,并发度为 1,仅处理快速任务
  • 1 个 celery 工作线程 W2,并发度为 1,可以处理快速和慢速任务。

celery 默认情况下的任务预取乘数 ( https://docs.celeryproject.org/en/latest/userguide/configuration.html#worker-prefetch-multiplier ) 为 4,这意味着 4 个快速任务可以在慢速任务后面排队,并且可能会延迟几分钟。因此我想禁用工作线程 W2 的预取。该文档指出:

To disable prefetching, set worker_prefetch_multiplier to 1. Changing that setting to 0 will allow the worker to keep consuming as many messages as it wants.

但是我观察到,当 prefetch_multiplier 为 1 时,会预取一个任务,并且仍然会被慢速任务延迟。

这是一个文档错误吗?这是一个实现错误吗?或者我误解了文档?有什么办法可以实现我想要的吗?

我执行来启动工作程序的命令是:

celery -A miniclry worker --concurrency=1 -n w2 -Q=fast,slow --prefetch-multiplier 0
celery -A miniclry worker --concurrency=1 -n w1 -Q=fast

我的 celery 设置是默认设置,除了:

CELERY_BROKER_URL = "pyamqp://*****@localhost:5672/mini"
CELERY_TASK_ROUTES = {
'app1.tasks.task_fast': {"queue": "fast"},
'app1.tasks.task_slow': {"queue": "slow"},
}

我的 django 项目的 celery.py 文件是:

from __future__ import absolute_import
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'miniclry.settings')
app = Celery("miniclry", backend="rpc", broker="pyamqp://")
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

我的django项目的__init__.py

from .celery import app as celery_app
__all__ = ('celery_app',)

我的 worker 的代码

import time, logging
from celery import shared_task
from miniclry.celery import app as celery_app
logger = logging.getLogger(__name__)

@shared_task
def task_fast(delay=0.1):
logger.warning("fast in")
time.sleep(delay)
logger.warning("fast out")

@shared_task
def task_slow(delay=30):
logger.warning("slow in")
time.sleep(delay)
logger.warning("slow out")

如果我从管理 shell 执行以下操作,我会发现该快速任务仅在慢速任务完成后才会执行。

from app1.tasks import task_fast, task_slow

task_slow.delay()
for i in range(30):
task_fast.delay()

有人可以帮忙吗?

如果认为有帮助,我可以发布整个测试项目。只是建议推荐的交换此类项目的方式

版本信息:

  • celery ==4.3.0
  • Django==1.11.25
  • Python 2.7.12

最佳答案

我确认了这个问题,this中有一个错误文档部分。 worker_prefetch_multiplier = 1 顾名思义,将worker的预取设置为1,意味着worker除了当前正在执行的任务之外,还会多持有一项任务。

要真正禁用预取,您还需要使用 task_acks_late = True 以及预取设置,请参阅 this文档部分

关于python - django + celery : disable prefetch for one worker, 有错误吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58290045/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com