gpt4 book ai didi

python - Celery 可以将状态更新传递给非阻塞调用者吗?

转载 作者:行者123 更新时间:2023-12-01 06:37:53 25 4
gpt4 key购买 nike

我正在使用Celery异步执行一组操作。这些操作有很多,每个操作都可能需要很长时间,因此我不想将结果发送回 Celery 工作函数的返回值中,而是希望将它们作为自定义状态更新一次发送回一个。这样,调用者可以通过更改状态回调实现进度条,并且工作函数的返回值可以是恒定大小,而不是与操作数量呈线性关系。

这是一个简单的示例,其中我使用 Celery 工作函数 add_pairs_of_numbers添加数字对列表,为每个添加的数字对发回自定义状态更新。

#!/usr/bin/env python

"""
Run worker with:

celery -A tasks worker --loglevel=info
"""
from celery import Celery

app = Celery("tasks", broker="pyamqp://guest@localhost//", backend="rpc://")

@app.task(bind=True)
def add_pairs_of_numbers(self, pairs):
for x, y in pairs:
self.update_state(state="SUM", meta={"x":x, "y":y, "x+y":x+y})
return len(pairs)

def handle_message(message):
if message["status"] == "SUM":
x = message["result"]["x"]
y = message["result"]["y"]
print(f"Message: {x} + {y} = {x+y}")

def non_looping(*pairs):
task = add_pairs_of_numbers.delay(pairs)
result = task.get(on_message=handle_message)
print(result)

def looping(*pairs):
task = add_pairs_of_numbers.delay(pairs)
print(task)
while True:
pass

if __name__ == "__main__":
import sys

if sys.argv[1:] and sys.argv[1] == "looping":
looping((3,4), (2,7), (5,5))
else:
non_looping((3,4), (2,7), (5,5))

如果你只运行./tasks它执行 non_looping功能。这会执行标准的 Celery 操作:延迟调用辅助函数,然后使用 get等待结果。一个handle_message回调函数打印每条消息,并返回添加的对数作为结果。这就是我想要的。

$ ./task.py
Message: 3 + 4 = 7
Message: 2 + 7 = 9
Message: 5 + 5 = 10
3

虽然非循环场景对于这个简单的示例来说已经足够了,但我试图完成的现实世界任务是处理一批文件而不是添加数字对。此外,客户是Flask REST API 因此不能包含任何阻塞 get来电。在上面的脚本中,我使用 looping 模拟此约束。功能。该函数启动异步 Celery 任务,但不等待响应。 (随后的无限 while 循环模拟 Web 服务器继续运行并处理其他请求。)

如果使用参数“looping”运行脚本,它将运行此代码路径。这里它立即打印 Celery 任务 ID,然后进入无限循环。

$ ./tasks.py looping
a39c54d3-2946-4f4e-a465-4cc3adc6cbe5

Celery工作日志显示执行了添加操作,但调用者没有定义回调函数,因此它永远无法获取结果。

(我意识到这个特定的示例是令人尴尬的并行,因此我可以使用 chunks 将其划分为多个任务。但是,在我的非简化的现实情况中,我有无法并行化的任务。)

我想要的是能够在 looping 中指定回调设想。像这样的东西。

def looping(*pairs):
task = add_pairs_of_numbers.delay(pairs, callback=handle_message) # There is no such callback.
print(task)
while True:
pass

在 Celery 文档和我可以在线找到的所有示例(例如 this )中,无法将回调函数定义为 delay 的一部分调用或其 apply_async相等的。您只能指定一个作为 get 的一部分打回来。这让我认为这是一个有意的设计决定。

在我的 REST API 场景中,我可以通过让 Celery 工作进程以 HTTP post 的形式将“状态更新”发送回 Flask 服务器来解决这个问题,但这看起来很奇怪,因为我开始复制消息传递HTTP 中的逻辑已经存在于 Celery 中。

有什么办法可以写我的looping以便调用者在不进行阻塞调用的情况下接收回调,或者在 Celery 中明确禁止这种情况?

最佳答案

这是 celery 不支持的模式,尽管您可以(在某种程度上)通过向任务发布自定义状态更新来欺骗它 as described here

Use update_state() to update a task’s state:.

def upload_files(self, filenames):
for i, file in enumerate(filenames):
if not self.request.called_directly:
self.update_state(state='PROGRESS',
meta={'current': i, 'total': len(filenames)})```

celery 不支持这种模式的原因是任务生产者(调用者)与任务消费者( worker )强烈解耦,两者之间唯一的通信是代理,以支持从生产者到消费者的通信以及结果支持从消费者到生产者的通信的后端。目前最接近的方法是轮询任务状态或编写自定义结果后端,以便您可以通过 AMP RPC 或 Redis 订阅发布事件。

关于python - Celery 可以将状态更新传递给非阻塞调用者吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59585652/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com