gpt4 book ai didi

opencv - 通过 Autobahn WebSocket 流式传输视频

转载 作者:太空宇宙 更新时间:2023-11-03 22:31:20 26 4
gpt4 key购买 nike

我从 OpenCV VideoCapture.read() 捕获视频帧并将帧发送到 WebSocket 服务器(Twisted with Autobahn WebSocket API),我还使用 Twisted IPushProducer 接口(interface)将数据流式传输到 WebSocket,并最终在发送回客户端时清理摄像头。

这是我的代码。

server.py

import cv2
import cv2.cv as cv
import numpy as np
from autobahn.twisted.websocket import WebSocketServerProtocol, \
WebSocketServerFactory, \
listenWS
from VideoStreamClient import BATH_SIZE

class VideoStreamServerProtocol(WebSocketServerProtocol):
def onConnect(self,request):
print("Client connecting: {0}".format(request.peer))

def onOpen(self):
print("WebSocket connection open.")

def onClose(self, wasClean, code, reason):
print("WebSocket connection closed: {0}".format(reason))

def onMessageBegin(self, isBinary):
WebSocketServerProtocol.onMessageBegin(self, isBinary)

def onMessageFrameBegin(self, length):
WebSocketServerProtocol.onMessageFrameBegin(self, length)
self.received = 0
self.next = BATCH_SIZE

def onMessageFrameData(self, payload):
self.received += len(payload)
if self.received >= self.next:
self.sendMessageFrameData(payload,isBinary=True)
self.received = 0;

def onMessageFrameEnd(self):
pass

def onMessageEnd(self):
pass

class VideoStreamServerFactory(WebSocketServerFactory):
protocol = VideoStreamServerProtocol

def __init__(self):
WebSocketServerFactory.__init__(self,"ws://localhost:9000", debug = False)

if __name__ == '__main__':
import sys
from twisted.python import log
from twisted.internet import reactor

log.startLogging(sys.stdout)

factory = VideoStreamServerFactory()
listenWS(factory)
reactor.run()

客户端.py

from autobahn.twisted.websocket import WebSocketClientFactory, \
WebSocketClientProtocol, \
connectWS

from zope.interface import implementer
from twisted.internet import reactor, interfaces

import cv2
import numpy as np

FRAME_SIZE = 0x7FFFFFFFFFFFFFFF
BATCH_SIZE = 1 * 2**20

@implementer(interfaces.IPushProducer)
class VideoStreamProducer:
def __init__(self,proto):
self.proto = proto
self.started = False
self.paused = False

def pauseProducing(self):
self.paused = True

def resumeProducing(self):
self.paused = False
if not self.started:
self.cap = cv2.VideoCapture(0)
self.cap.set(cv2.cv.CV_CAP_PROP_FRAME_WIDTH, 640)
self.cap.set(cv2.cv.CV_CAP_PROP_FRAME_HEIGHT, 480)

self.proto.beginMessage(isBinary=True)
self.proto.beginMessageFrame(FRAME_SIZE)
self.started = True
while not self.paused:
isSuccess, frame = self.cap.read()
_, data = cv2.imencode(".jpg",frame,encode_param)
if isSuccess:
if self.proto.sendMessageFrameData(data)<=0:
self.proto.beginMessageFrame(FRAME_SIZE)

def stopProducing(self):
self.cap.release()

class VideoStreamClientProtocol(WebSocketClientProtocol):
def onConnect(self,response):
pass

def onOpen(self):
producer = VideoStreamProducer(self)
self.registerProducer(producer, True)
producer.resumeProducing()

def onMessage(self, payload, isBinary):
print(len(payload))

if __name__ == '__main__':
factory = WebSocketClientFactory("ws://localhost:9000")
factory.protocol = VideoStreamClientProtocol
connectWS(factory)
reactor.run()

最佳答案

这是一个无限循环,一旦开始就阻止任何事件处理发生:

while not self.paused:
isSuccess, frame = self.cap.read()
_, data = cv2.imencode(".jpg",frame,encode_param)
if isSuccess:
if self.proto.sendMessageFrameData(data)<=0:
self.proto.beginMessageFrame(FRAME_SIZE)

请记住,Twisted 使用单线程协作多任务处理方法来实现并发。此循环占用(单个) react 堆线程并且不与其他任务协作。

如果您想重复运行某些代码,请查看 twisted.internet.task.LoopingCalltwisted.internet.task.cooperate

关于opencv - 通过 Autobahn WebSocket 流式传输视频,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23494567/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com