gpt4 book ai didi

python - celery 任务是 "Received"是什么意思?当所有 celery worker 都被阻塞时,不是 "Received"的新任务会发生什么?

转载 作者:行者123 更新时间:2023-12-02 19:10:02 27 4
gpt4 key购买 nike

我正在开发一个新的监控系统,该系统可以测量 Celery 队列吞吐量并在队列备份时帮助提醒团队。在我的工作过程中,我遇到了一些我不理解的奇怪行为(并且在 Celery 规范中没有详细记录)。

出于测试目的,我设置了一个端点,它将用 16 个可用于模拟备份队列的长时间运行的任务填充队列。框架是 Flask,Queue broker 是 Redis。 Celery 为每个工作人员配置为最多并行处理 4 个任务,我有 2 个工作人员在运行。

api/health.py

def health():
health = Blueprint("health", __name__)

@health.route("/api/debug/create-long-queue", methods=["GET"])
def long_queue():
for i in range(16):
sleepy_job.delay()

return make_response({}, 200)

return health

jobs.py

@celery.task(priority=HIGH_PRIORITY)
def sleepy_job(*args, **kwargs):
time.sleep(30)

这是我模拟备份生产队列的方法:

  1. 我调用 /api/debug/create-long-queue 来模拟队列中的备份。根据上面的计算,worker 应该忙着睡 1 分钟(加起来,他们一次可以并发处理 8 个任务。每个任务只睡 30 秒,总共有 16 个任务。)
  2. 我在不久之后(< 5 秒)进行了另一个 API 调用,它启动了具有真实业务逻辑的另一项工作(处理入站 Webhook API 调用)。我们将此作业称为 handle_incoming_message

这是我看到的使用 flower 检查队列:

  • 虽然所有工作人员都被前 8 个 sleepy_job 任务阻塞,但我在队列中看不到新的 handle_incoming_message 的迹象,尽管我确定 handle_incoming_message .delay() 已作为第二次 API 调用的结果被调用。
  • 前 8 个 sleepy_job 任务完成后(约 30 秒),我在队列中看到状态为 RECIEVED 的新 handle_incoming_message
  • 在第二个(也是最后一个)8 个sleepy_job 任务完成后,我现在看到handle_incoming_message 处于STARTED 状态(我可以确认这是因为 UI 使用在该任务中接收和处理的新数据更新。)

问题

所以很明显,当工作人员在处理前 8 个 sleepy_job 任务后暂时解除阻塞时,他们正在做某事来标记/确认新的 handle_incoming_message 以花可见的方式完成任务。 但这留下了几个悬而未决的问题:

  • 当工作人员被阻塞时,新的 handle_incoming_message 任务的状态是什么?
  • worker 解除阻塞后发生了什么变化,使 flower 现在可以看到新的 handle_incoming_message 任务?
  • “RECEIVED”状态的实际含义是什么?
  • (奖励:我如何才能看到在工作人员被阻塞时排队的任务?)

最佳答案

  1. 当所有 worker 都被阻塞时,由于预取,一些任务可能处于已接收状态(请查看相关文档)。因此,您的任务很可能只是在队列中,等待 Celery 工作人员(协调进程 - 这些不是实际的工作进程)接收。

  2. Flower 是一种简单的服务,它建立在称为“任务事件”的 Celery 功能之上。简单来说,它 (Flower) 将自己订阅为所有事件(接收、成功、开始、失败等)的接收者,然后将这些事件可视化地呈现给 Web 客户端。 More about it here .因此,当 Celery worker 接收到任务时,将发送“任务接收”事件。 Flower 获取此事件,并在仪表板中更改该任务的状态。

  3. 当一个任务被“接收”时,这意味着特定的 Celery worker 将该任务从队列中取出并且它可能会立即执行(如果有空闲的 worker-process 来执行它),或者 Celery worker 将等待让工作进程准备好运行任务。我已经提到过预取 - Celery worker 通常会比可用的 worker-processes 承担更多的任务。

  4. Celery 没有为用户提供列出特定队列的方法。这就是为什么您会看到许多类似的问题 - 包括 this one which offers answers .你会在那里看到我的简短回答。简而言之,这取决于您选择的经纪人。如果是 Redis,那么您只需浏览对象列表即可。如果是 RabbitMQ,那么您可以使用他们的工具来检查队列。我认为不提供此信息的决定是好的,因为此信息永远不可靠。当您列出特定队列中的所有任务时,可能会有数千个新任务......

关于python - celery 任务是 "Received"是什么意思?当所有 celery worker 都被阻塞时,不是 "Received"的新任务会发生什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64433871/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com