gpt4 book ai didi

Python 多处理,从远程 RTSP 摄像头读取,无需缓冲区

转载 作者:行者123 更新时间:2023-12-02 18:29:12 27 4
gpt4 key购买 nike

我有一个有两个进程的系统,其中:

  1. “读取器”进程,通过 RTSP 从远程摄像机获取帧;
  2. 从“阅读器”读取的帧被发送到“消费者”,以在其上运行一些计算机视觉算法。

现在的问题是,在“阅读器”中以 25 FPS 从相机读取帧,但在“消费者”中分析它们的速度显然要慢得多。然后,我不希望“消费者”分析所有这些数据,而只分析最新的可用数据(因此计算机视觉检测指的是实时流)。

类似于此处描述的内容: enter image description here

我设法通过解决方法使这项工作按照我想要的方式进行。
基本上,在阅读器中,我检查队列是否为空。如果没有,说明那里的帧还没有被分析,所以我将其删除并替换为当前使用的帧:

launcher.py -> 启动一切

from reader import Reader
from consumer import Consumer
import multiprocessing as mp
from multiprocessing import set_start_method, Queue, Event


def main():

set_start_method("spawn")
frames_queue = mp.Queue()
stop_switch = mp.Event()

reader = mp.Process(target=Reader, args=(frames_list,), daemon=True)
consumer = mp.Process(target=Consumer, args=(frames_list, stop_switch), daemon=True)

reader.start()
consumer.start()

while True:
if stop_switch.is_set():
reader.terminate()
consumer.terminate()
sys.exit(0)


if __name__ == "__main__":
main()

reader.py -> 从相机读取帧

import cv2

def Reader(thing):
cap = cv2.VideoCapture('rtsp_address')

while True:
ret, frame = cap.read()
if ret:
if not frames_queue.empty():
try:
frames_queue.get_nowait() # discard previous (unprocessed) frame
except queue.Empty:
pass

try:
frames_queue.put(cv2.resize(frame, (1080, 720)), block=False)
except:
pass

消费者中也有类似的东西:

consumer.py

import cv2

def Consumer(frames_queue, stop_switch):

while True:

try:
frame = frames_queue.get_nowait() ## get current camera frame from queue
except:
pass

if frame:
## do something computationally intensive on frame
cv2.imshow('output', cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

## stop system when pressing 'q' key
key = cv2.waitKey(1)
if key==ord('q'):
stop_switch.set()
break

但是我不太喜欢这个,看起来有点太乱了。另外,我必须使用所有 try/except block 来避免竞争条件,其中“读取器”在放置新帧之前清空队列,而“消费者”尝试同时获取帧。还有其他更好的方法吗?

最佳答案

这是我能想到的一些想法......

...我不是所谓的 MP 专家,但我在并行类型图像处理的生产代码中大量使用了 MP 和 MT。

  1. 思考您所说的话后的第一个问题 - 为什么要使用多重处理?

让我解释一下:您的最高级别问题可以表述为

  • “在最近的帧上执行一些计算密集型操作”
  • 警告:此“某事”所花费的时间远大于生成帧的时间。

在当前的代码中,您只是丢弃生成器中除最新帧之外的所有内容,正如您所说,生成帧比处理它们要快得多。

那么,考虑到这一点,为什么还要使用 MP?难道不是a)更简单,b)更快吗(元代码):


while I_should_be_running:

frame = get_latest_frame()
processed = do_work_on_frame()
save_if_needed(processed)

在这种情况下,您可能应该使用多线程来实现一种在主循环之外设置/取消设置I_should_be_running的方法,但关键点是工作您所做的一切都在一个过程中。

老实说,就 KISS 原则而言,我个人在尝试实现 MP 时总是遇到一些小问题和复杂性(至少对我来说),根据您提出的问题,这可能会很多明智的做法是在一个进程中按上述方式运行所有内容...

  • 如果您使用 MP,并且只想处理最新的帧,为什么要使用队列?
  • 生产者中的代码是

    • 清空当前队列
    • 获取新框架
    • 添加队列

    那么,为什么要使用队列呢?相反,也许只使用生产者写入的单个实例变量,例如 most_recent_frame

    您需要在进程之间设置锁定,以便消费者可以锁定它,以便它可以复制它,然后进行处理。但我相信 MP 库中有各种工具可以做到这一点。

    这将避免执行不需要的工作(即处理时间)带来的大量开销:创建未使用的对象实例,腌制队列中的这些对象(请参阅“管道和队列”的 https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue 部分 - 这很慢! )以及那些未使用的对象如果不使用则删除后进行垃圾回收。

    此外,您正在使用带有(理论上)生产者的队列,该生产者不断添加内容,但没有限制。虽然代码中不存在风险(因为您要删除生产者中的所有先前条目),但如果出现问题,这种方法可能会存在内存溢出的巨大风险。

  • 为什么要在基于Pipe的流程中使用连接对象(即队列)?
  • 当您在 MP 中使用队列时,它基于 Pipe,它将创建您使用它的对象的腌制副本。对于大型和/或复杂的对象来说,这可能会非常慢。

    相反,如果您关心最大速度,那么您可以使用 ctypes 共享内存对象(再次参见 https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue )。

    这确实需要锁定(MP 代码提供),但比大型/复杂对象的基于队列的对象选取/复制要快得多 - 您的框架可能是这样的。

    考虑到您尝试每秒生成数十次帧,这种 pickle/copy 开销可能会很大。

    关于Python 多处理,从远程 RTSP 摄像头读取,无需缓冲区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69661697/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com