gpt4 book ai didi

python - celery 可以合作运行协同程序作为有状态/可恢复的任务吗?

转载 作者:太空宇宙 更新时间:2023-11-04 01:12:54 25 4
gpt4 key购买 nike

我目前正在研究 Celery 在视频处理后端中的使用。基本上我的问题如下:

  1. 我有一个前端网络服务器,可以同时处理大量视频流(大约数千个)。
  2. 每个流必须独立并行处理。
  3. 流处理可以分为两类操作:
    1. 逐帧操作(不需要有关前一帧或后一帧信息的计算)
    2. 流级操作(对有序相邻帧的子集进行计算)

鉴于第 3 点,我需要在整个过程中维护和更新帧的有序结构,并将此结构的子部分的计算农场计算给 Celery 工作人员。最初,我想按如下方式组织事情:

[frontend server]  -stream-> [celery worker 1 (greenlet)] --> [celery worker 2 (prefork)]

想法是 celery worker 1 执行主要是 I/O 绑定(bind) 的长时间运行的任务。本质上,这些任务将执行以下操作:

  1. 从前端服务器读取一个帧
  2. 从帧的 base64 表示解码帧
  3. 将其放入上述有序数据结构(一个 collections.deque 对象,目前的状态)中。

任何 CPU 密集型操作( 图像分析)都被传送到 celery worker 2

我的问题如下:

我想执行一个协程作为一项任务,这样我就有一个长期运行的任务,我可以从中yield 以免阻塞 celery worker 1'操作。换句话说,我希望能够做类似于以下的事情:

def coroutine(func):
@wraps(func)
def start(*args, **kwargs):
cr = func(*args, **kwargs)
cr.next()
return cr
return start

@coroutine
def my_taks():
stream = deque() # collections.deque
source = MyAsynchronousInputThingy() # something i'll make myself, probably using select

while source.open:
if source.has_data:
stream.append(Frame(source.readline())) # read data, build frame and enqueue to persistent structure
yield # cooperatively interrupt so that other tasks can execute

有没有办法让基于协程的任务无限期地运行,理想情况下在 yielded 时产生结果?

最佳答案

Eventlet 背后的主要思想是您想要编写同步代码,与线程一样,socket.recv() 应该阻塞当前线程直到下一个语句。这种风格在调试时非常容易阅读、维护和推理。为了使事情变得有效和可扩展,在幕后,Eventlet 施展魔法,用绿色线程和 epoll/kqueue/etc 机制替换看似阻塞的代码,以在适当的时间唤醒这些绿色线程。

因此,您需要做的就是尽快执行 eventlet.monkey_patch()(例如模块中的第二行),并确保在 MyInputThingy 中使用纯 Python 套接字操作.忘掉异步,就像处理线程一样编写普通的阻塞代码。

Eventlet 让同步代码再次变得美好。

关于python - celery 可以合作运行协同程序作为有状态/可恢复的任务吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26566801/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com