gpt4 book ai didi

python - Celery:防止在排队的任务过多时添加更多任务

转载 作者:太空宇宙 更新时间:2023-11-03 21:05:58 25 4
gpt4 key购买 nike

我有一个 Flask REST API,它利用 Celery 来运行异步请求。

其想法是,async=1 查询参数指示应异步处理请求(立即返回客户端稍后将使用的任务 ID)。

同时,我想防止在等待处理的时间过多时接受新任务

下面的代码可以工作,但是 accepting_new_tasks() 需要大约 2 秒,这太慢了。

Celery 中是否有一个配置(或其他东西)可以限制等待任务的数量;或者更快的方法来获取等待任务的数量?

import math

from celery import Celery
from flask import abort, Flask, jsonify, request


flask_app = Flask(__name__)
celery_app = Celery("tasks", broker="rabbit...")


@flask_app.route("/")
def home():
async_ = request.args.get("async")
settings = request.args.get("settings")

if async_:
if not accepting_new_tasks(celery_app):
return abort(503)

task = celery_app.send_task(name="my-task", kwargs={"settings": settings})
return jsonify({"taskId": task.id})

return jsonify({})


def accepting_new_tasks(celery_app):
inspector = celery_app.control.inspect()
nodes_stats = inspector.stats()
nodes_reserved = inspector.reserved()

workers = 0
for stats in nodes_stats.values():
workers += stats["pool"]["max-concurrency"]

waiting_tasks = 0
for reserved in nodes_reserved.values():
waiting_tasks += len(reserved)

return waiting_tasks < math.ceil(workers / 3)

最佳答案

最终我通过查询 RabbitMQ 管理 API https://stackoverflow.com/a/27074594/4183498 解决了这个问题。指出。

import math

from celery import Celery
from flask import abort, Flask, jsonify, request
from requests import get
from requests.auth import HTTPBasicAuth


flask_app = Flask(__name__)
celery_app = Celery("tasks", broker="rabbit...")


def get_workers_count():
inspector = celery_app.control.inspect()
nodes_stats = inspector.stats()
nodes_reserved = inspector.reserved()

workers = 0
for stats in nodes_stats.values():
workers += stats["pool"]["max-concurrency"]

return workers


WORKERS_COUNT = get_workers_count()


@flask_app.route("/")
def home():
async_ = request.args.get("async")
settings = request.args.get("settings")

if async_:
if not accepting_new_tasks(celery_app):
return abort(503)

task = celery_app.send_task(name="my-task", kwargs={"settings": settings})
return jsonify({"taskId": task.id})

return jsonify({})


def accepting_new_tasks(celery_app):WORKERS_COUNT
auth = HTTPBasicAuth("guest", "guest")
response = get(
"http://localhost:15672/api/queues/my_vhost/celery",
auth=auth
)
waiting_tasks = response.json()["messages"]
return waiting_tasks < math.ceil(WORKERS_COUNT / 3)

关于python - Celery:防止在排队的任务过多时添加更多任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55400216/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com