gpt4 book ai didi

python - 如何将 Celery 与 asyncio 结合使用?

转载 作者:IT老高 更新时间:2023-10-28 20:34:32 34 4
gpt4 key购买 nike

如何创建一个使 celery 任务看起来像 asyncio.Task 的包装器?或者有没有更好的方法将 Celery 与 asyncio 集成?

@asksol,Celery 的创造者,said this: :

It's quite common to use Celery as a distributed layer on top of async I/O frameworks (top tip: routing CPU-bound tasks to a prefork worker means they will not block your event loop).

但我找不到任何专门针对 asyncio 框架的代码示例。

最佳答案

编辑:2021 年 1 月 12 日以前的答案(在底部找到)没有很好地老化,因此我添加了可能的解决方案组合,这些解决方案可能会满足那些仍然在寻找如何共同使用 asyncio 和 celery

让我们先快速分解用例(更深入的分析:asyncio and coroutines vs task queues):

  • 如果任务受 I/O 限制,那么使用协程和异步会更好。
  • 如果任务受 CPU 限制,则最好使用 Celery 或其他类似的任务管理系统。

因此,在 Python 的“做一件事并把它做好”的上下文中,不要尝试将 asyncio 和 celery 混合在一起是有意义的。

但是,如果我们希望能够以异步方式和异步任务的形式运行方法,会发生什么情况?那么我们有一些选择可以考虑:

  • 我能找到的最好的例子如下:https://johnfraney.ca/posts/2018/12/20/writing-unit-tests-celery-tasks-async-functions/ (我刚刚发现它是 @Franey's response ):

    1. 定义你的异步方法。

    2. 使用asgirefsync.async_to_sync 模块来包装异步方法并在 celery 任务中同步运行它:

      # tasks.py
      import asyncio
      from asgiref.sync import async_to_sync
      from celery import Celery

      app = Celery('async_test', broker='a_broker_url_goes_here')

      async def return_hello():
      await asyncio.sleep(1)
      return 'hello'


      @app.task(name="sync_task")
      def sync_task():
      async_to_sync(return_hello)()
  • 我在 FastAPI 中遇到的一个用例应用程序与上一个示例相反:

    1. 一个 CPU 密集型进程正在占用异步端点。

    2. 解决方案是将异步 CPU 绑定(bind)的进程重构为一个 celery 任务,并从 Celery 队列中传递一个任务实例执行。

    3. 可视化该案例的最小示例:

      import asyncio
      import uvicorn

      from celery import Celery
      from fastapi import FastAPI

      app = FastAPI(title='Example')
      worker = Celery('worker', broker='a_broker_url_goes_here')

      @worker.task(name='cpu_boun')
      def cpu_bound_task():
      # Does stuff but let's simplify it
      print([n for n in range(1000)])

      @app.get('/calculate')
      async def calculate():
      cpu_bound_task.delay()

      if __name__ == "__main__":
      uvicorn.run('main:app', host='0.0.0.0', port=8000)
  • 另一种解决方案似乎是@juanra@danius在他们的答案中提出了建议,但我们必须记住,当我们混契约(Contract)步和异步执行时,性能往往会受到影响,因此在我们决定在 prod 环境中使用它们之前,需要监控这些答案。

最后,有一些现成的解决方案,我不能推荐(因为我自己没有使用过),但我会在这里列出它们:

  • Celery Pool AsyncIO这似乎完全解决了 Celery 5.0 没有解决的问题,但请记住,它似乎有点实验性(今天 0.2.0 版 01/12/2021)
  • aiotasks声称是“一个像 Celery 一样分发 Asyncio 协程的任务管理器”,但似乎有点陈旧(大约 2 年前的最新提交)

嗯,它的年龄没有那么好,是吗? Celery 5.0 版没有实现异步兼容性,因此我们无法知道何时以及是否会实现......出于响应遗留原因(因为它是当时的答案)和评论继续,将其留在这里。

如官方网站所述,这将从 Celery 5.0 版中实现:

http://docs.celeryproject.org/en/4.0/whatsnew-4.0.html#preface

  1. Celery 的下一个主要版本将仅支持 Python 3.5,我们计划在其中利用新的 asyncio 库。
  2. 放弃对 Python 2 的支持将使我们能够删除大量的兼容性代码,而使用 Python 3.5 使我们能够利用打字、异步/等待、异步和类似概念,而在旧版本中没有其他选择。

以上内容来自上一个链接。

所以最好的办法就是等待5.0版发布!

与此同时,祝你编码愉快:)

关于python - 如何将 Celery 与 asyncio 结合使用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39815771/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com