gpt4 book ai didi

python - Celery 任务设置,将视频帧的内存缓存作为 python 中的循环缓冲区策略

转载 作者:行者123 更新时间:2023-12-01 08:51:46 27 4
gpt4 key购买 nike

我想在 Celery 上构建多任务处理管道并希望多个任务处理同一视频文件。任务需要共享视频数据。因此,并非每个任务都必须从视频文件中解码和提取帧。视频数据将是提取的帧的列表(并非需要视频的每一帧)。

有什么解决方案可以有效地共享这些帧吗?任务可以在不同的节点上处理。但我不想像 Memcached 或 Redis 那样通过网络共享数据。任务应该在内存/缓存中查找视频数据,如果不存在,则任务应该发出另一个任务来加载视频并提取帧到缓存。

(每个视频文件的生产者和多个消费者)

因此同一节点/机器上的任务能够共享缓存数据。不同节点上的两个任务没有缓存的好处。

我不想缓存整个提取的视频,必须有一些循环缓冲区缓存。每个视频的缓存大小固定,假设为 100 帧。最快和最慢任务之间的差距不能超过 100 帧。内存/缓存中总共只有 100 帧。

出现两个主要问题:

  1. 任务设置

    任务 A:从视频中提取帧(生产者到内存/缓存)

    任务 B1:使用帧并进行实际工作(处理帧)

    。.

    任务 Bn:消耗帧并执行实际工作(处理帧)

    A、B1 - Bn 并行运行。但这些任务必须在同一节点上运行。如果 B 任务分布在不同的节点上,则必须生成另一个 A 任务(每个节点上一个任务用于解码和提取帧)。您在这里推荐什么方法?最好的选择是什么?

  2. Python 缓存

    是否有最适合我的用例的缓存库/实现/解决方案,可以通过一些循环缓冲区实现在本地计算机上缓存大数据?类似于 DiskCache但能够通过环形缓冲仅缓存 100 帧。

您建议采用哪些方法和设计来实现我的用例?我想坚持使用 Celery 进行任务分配。

最佳答案

这可能是我的固执表现,但我总是发现像 celery 这样的项目在多处理(已经很复杂)的基础上增加了一堆复杂性,带来的麻烦比它们的值(value)还要多。从速度和简单性的角度来看,没有比使用 stdlib 共享内存和互斥体更好的替代方案了。

对于您的情况,一个简单的解决方案是为每个进程使用 fifo 队列,并将帧放入来自生产者的每个进程中。如果您为 n 个消费者制作每个帧的 n 个副本,这自然会产生大量内存使用量,但是您可能很容易想出一种机制将帧本身放入 multiprocessing.sharedctypes.Array 中。并仅将索引传递到队列中。只要队列的长度被限制为小于缓冲区的长度,就应该限制您覆盖缓冲区中的帧,直到它被所有消费者使用为止。如果没有任何同步,这将是凭直觉实现的,但一点点互斥魔法绝对可以使其成为一个非常强大的解决方案。

例如:

import numpy as np
from time import sleep
from multiprocessing import Process, freeze_support, Queue
from multiprocessing.sharedctypes import Array
from ctypes import c_uint8
from functools import reduce

BUFSHAPE = (10,10,10) #10 10x10 images in buffer

class Worker(Process):
def __init__(self, q_size, buffer, name=''):
super().__init__()
self.queue = Queue(q_size)
self.buffer = buffer
self.name = name

def run(self,): #do work here
#I hardcoded datatype here. you might need to communicate it to the child process
buf_arr = np.frombuffer(self.buffer.get_obj(), dtype=c_uint8)
buf_arr.shape = BUFSHAPE
while True:
item = self.queue.get()
if item == 'done':
print('child process: {} completed all frames'.format(self.name))
return
with self.buffer.get_lock(): #prevent writing while we're reading
#slice the frame from the array uning the index that was sent
frame = buf_arr[item%BUFSHAPE[0]] #depending on your use, you may want to make a copy here
#do some intense processing on `frame`
sleep(np.random.rand())
print('child process: {} completed frame: {}'.format(self.name, item))

def main():
#creating shared array
buffer = Array(c_uint8, reduce(lambda a,b: a*b, BUFSHAPE))
#make a numpy.array using that memory location to make it easy to stuff data into it
buf_arr = np.frombuffer(buffer.get_obj(), dtype=c_uint8)
buf_arr.shape = BUFSHAPE
#create a list of workers
workers = [Worker(BUFSHAPE[0]-2, #smaller queue than buffer to prevent overwriting frames not yet consumed
buffer, #pass in shared buffer array
str(i)) #numbered child processes
for i in range(5)] #5 workers

for worker in workers: #start the workers
worker.start()
for i in range(100): #generate 100 random frames to send to workers
#insert a frame into the buffer
with buffer.get_lock(): #prevent reading while we're writing
buf_arr[i%BUFSHAPE[0]] = np.random.randint(0,255, size=(10,10), dtype=c_uint8)
#send the frame number to each worker for processing. If the input queue is full, this will block until there's space
# this is what prevents `buf_arr[i%BUFSHAPE[0]] = np...` from overwriting a frame that hasn't been processed yet
for worker in workers:
worker.queue.put(i)
#when we're done send the 'done' signal so the child processes exit gracefully (or you could make them daemons)
for worker in workers:
worker.queue.put('done')
worker.join()


if __name__ == "__main__":
freeze_support()
main()

编辑

某种离一错误要求队列比缓冲区小 2 帧而不是小 1 帧,以防止在其时间之前覆盖帧。

EDIT2 - 第一次编辑的说明:

len(q) = len(buf)-2的原因似乎是q.get()在我们从缓冲区获取帧之前调用,并且在我们尝试将索引推送到队列之前写入帧本身。如果长度差异仅为 1,则工作线程可能会从队列中提取帧索引,然后生产者可能会发现它现在可以推送到队列并在工作线程有机会读取该帧之前移至下一帧本身。您可以通过多种方式以不同的方式处理此问题,从而减少始终相互等待的进程,也许可以使用 mp.Event .

关于python - Celery 任务设置,将视频帧的内存缓存作为 python 中的循环缓冲区策略,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53067309/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com