gpt4 book ai didi

python - FastAPI 中音频流的 Websockets 桥接器

转载 作者:行者123 更新时间:2023-12-04 16:25:27 28 4
gpt4 key购买 nike

客观的
我的目标是使用音频流。从逻辑上讲,这是我的目标:

  • 音频流来自 WebSocket A(FastAPI 端点)
  • 音频流被桥接到不同的 WebSocket,B,它将返回一个 JSON(Rev-ai 的 WebSocket)
  • Json 结果通过 WebSocket A 实时发回。因此,而音频流仍在传入。

  • 可能的解决方案
    为了解决这个问题,我有很多想法,但最终我一直在尝试桥接 WebSocket AWebSocket B .到目前为止,我的尝试涉及 ConnectionManager类,其中包含 Queue.queue .音频流的块被添加到这个队列中,这样我们就不会直接从 WebSocket A 消费。 . ConnectionManager还包含一个生成器方法来从队列中生成所有值。
    我的 FastAPI 实现消耗来自 websocket A像这样:
    @app.websocket("/ws")
    async def predict_feature(websocket: WebSocket):
    await manager.connect(websocket)
    try:
    while True:
    chunk = await websocket.receive_bytes()
    manager.add_to_buffer(chunk)
    except KeyboardInterrupt:
    manager.disconnect()
    在此摄取的同时,我希望有一项任务可以将我们的音频流桥接到 WebSocket B ,并将获取到的值发送到 WebSocket A .音频流可以通过前面提到的 generator 消费。方法。
    由于WebSocket B如何消费消息,生成器方法是必要的,如Rev-ai的 examples所示。 :
    streamclient = RevAiStreamingClient(access_token, config)
    response_generator = streamclient.start(MEDIA_GENERATOR)
    for response in response_generator:
    # return through websocket A this value
    print(response)
    这是最大的挑战之一,因为我们需要将数据消耗到生成器中并实时获取结果。
    最新尝试
    我一直在与 asyncio 碰碰运气;根据我的理解,一种可能性是创建一个在后台运行的协程。我一直没有成功,但听起来很有希望。
    我想过通过 FastAPI 触发这个启动事件,但我无法实现并发。我尝试使用 event_loops ,但它给了我一个 nested event loop相关错误。
    警告
    如果您的见解认为如此,FastAPI 可以是可选的,在某种程度上,WebSocket A 也是如此。在一天结束时,最终目标是通过我们自己的 API 端点接收音频流,通过 Rev.ai 的 WebSocket 运行它,做一些额外的处理,并将结果发回。

    最佳答案

    websocket的桥接<-> websocket
    下面是一个简单的 webscoket 代理示例,其中 websocket A和 websocket B都是 FastAPI 应用程序中的端点,但 websocket B可以位于其他地方,只需更改其地址 ws_b_uri .对于 websocket 客户端 websockets图书馆被使用。
    进行数据转发,代码A端点启动两个任务 forwardreverse并通过 asyncio.gather() 等待它们完成.两个方向的数据传输以并行方式发生。

    import asyncio

    from fastapi import FastAPI
    from fastapi import WebSocket
    import websockets
    app = FastAPI()

    ws_b_uri = "ws://localhost:8001/ws_b"


    async def forward(ws_a: WebSocket, ws_b: websockets.WebSocketClientProtocol):
    while True:
    data = await ws_a.receive_bytes()
    print("websocket A received:", data)
    await ws_b.send(data)


    async def reverse(ws_a: WebSocket, ws_b: websockets.WebSocketClientProtocol):
    while True:
    data = await ws_b.recv()
    await ws_a.send_text(data)
    print("websocket A sent:", data)


    @app.websocket("/ws_a")
    async def websocket_a(ws_a: WebSocket):
    await ws_a.accept()
    async with websockets.connect(ws_b_uri) as ws_b_client:
    fwd_task = asyncio.create_task(forward(ws_a, ws_b_client))
    rev_task = asyncio.create_task(reverse(ws_a, ws_b_client))
    await asyncio.gather(fwd_task, rev_task)


    @app.websocket("/ws_b")
    async def websocket_b(ws_b_server: WebSocket):
    await ws_b_server.accept()
    while True:
    data = await ws_b_server.receive_bytes()
    print("websocket B server recieved: ", data)
    await ws_b_server.send_text('{"response": "value from B server"}')
    更新(桥 websocket <-> 同步生成器)
    考虑到问题的最后更新,问题是 WebSocket B隐藏在一个同步生成器后面(实际上有两个,一个用于输入,另一个用于输出)实际上,任务变成了如何在 WebSocket 和同步生成器之间建立桥梁。而且因为我从来没有使用过 rev-ai库,我做了一个 stub 函数 stream_client_startstreamclient.start它接受一个生成器(原版 MEDIA_GENERATOR)并返回一个生成器(原版 response_generator)。
    在这种情况下,我通过 run_in_executor 在单独的线程中开始处理生成器。 ,并且为了不重新发明轮子,我使用来自 janus 的队列进行通信。库,它允许您通过队列绑定(bind)同步和异步代码。相应地,也有两个队列,一个用于 A -> B ,其他为 B -> A .

    import asyncio
    import time
    from typing import Generator
    from fastapi import FastAPI
    from fastapi import WebSocket
    import janus
    import queue

    app = FastAPI()


    # Stub generator function (using websocket B in internal)
    def stream_client_start(input_gen: Generator) -> Generator:
    for chunk in input_gen:
    time.sleep(1)
    yield f"Get {chunk}"


    # queue to generator auxiliary adapter
    def queue_to_generator(sync_queue: queue.Queue) -> Generator:
    while True:
    yield sync_queue.get()


    async def forward(ws_a: WebSocket, queue_b):
    while True:
    data = await ws_a.receive_bytes()
    print("websocket A received:", data)
    await queue_b.put(data)


    async def reverse(ws_a: WebSocket, queue_b):
    while True:
    data = await queue_b.get()
    await ws_a.send_text(data)
    print("websocket A sent:", data)


    def process_b_client(fwd_queue, rev_queue):
    response_generator = stream_client_start(queue_to_generator(fwd_queue))
    for r in response_generator:
    rev_queue.put(r)


    @app.websocket("/ws_a")
    async def websocket_a(ws_a: WebSocket):
    loop = asyncio.get_event_loop()
    fwd_queue = janus.Queue()
    rev_queue = janus.Queue()
    await ws_a.accept()

    process_client_task = loop.run_in_executor(None, process_b_client, fwd_queue.sync_q, rev_queue.sync_q)
    fwd_task = asyncio.create_task(forward(ws_a, fwd_queue.async_q))
    rev_task = asyncio.create_task(reverse(ws_a, rev_queue.async_q))
    await asyncio.gather(process_client_task, fwd_task, rev_task)

    关于python - FastAPI 中音频流的 Websockets 桥接器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65361686/

    28 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com