gpt4 book ai didi

python - Asyncio + aiohttp - redis Pub/Sub 和 websocket 在单个处理程序中读/写

转载 作者:IT王子 更新时间:2023-10-29 05:54:58 45 4
gpt4 key购买 nike

我目前正在玩 aiohttp看看它将如何作为具有 websocket 连接的移动应用程序的服务器应用程序执行。

这是简单的“Hello world”示例 ( as gist here ):

import asyncio
import aiohttp
from aiohttp import web


class WebsocketEchoHandler:

@asyncio.coroutine
def __call__(self, request):
ws = web.WebSocketResponse()
ws.start(request)

print('Connection opened')
try:
while True:
msg = yield from ws.receive()
ws.send_str(msg.data + '/answer')
except:
pass
finally:
print('Connection closed')
return ws


if __name__ == "__main__":
app = aiohttp.web.Application()
app.router.add_route('GET', '/ws', WebsocketEchoHandler())

loop = asyncio.get_event_loop()
handler = app.make_handler()

f = loop.create_server(
handler,
'127.0.0.1',
8080,
)

srv = loop.run_until_complete(f)
print("Server started at {sock[0]}:{sock[1]}".format(
sock=srv.sockets[0].getsockname()
))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(handler.finish_connections(1.0))
srv.close()
loop.run_until_complete(srv.wait_closed())
loop.run_until_complete(app.finish())
loop.close()

问题

现在我想使用下面描述的结构(节点服务器 = python aiohttp)。更具体地说,使用 Redis Pub/Sub机制与 asyncio-redis在我的 WebsocketEchoHandler 中读取和写入 websocket 连接和 Redis。

WebsocketEchoHandler 是一个非常简单的循环,所以我不确定应该如何完成。使用 Tornadobrükva我只会使用回调。

http://goldfirestudios.com/blog/136/Horizontally-Scaling-Node.js-and-WebSockets-with-Redis

额外的(可能是题外话)问题

因为我正在使用 Redis已经,我应该采用以下两种方法中的哪一种:

  1. 就像在“经典”网络应用程序中一样,对所有内容都有一个 Controller / View ,使用 Redis仅用于消息传递等。
  2. Web 应用程序应该只是客户端和 Redis 之间的一个层也用作任务队列(最简单的 Python RQ )。每个请求都应委托(delegate)给工作人员。

编辑

图片来自http://goldfirestudios.com/blog/136/Horizontally-Scaling-Node.js-and-WebSockets-with-Redis

编辑 2

看来我需要澄清一下。

  • 仅 Websocket 处理程序如上所示
  • Redis Pub/Sub 处理程序可能如下所示:

    class WebsocketEchoHandler:

    @asyncio.coroutine
    def __call__(self, request):
    ws = web.WebSocketResponse()
    ws.start(request)

    connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
    subscriber = yield from connection.start_subscribe()
    yield from subscriber.subscribe(['ch1', 'ch2'])

    print('Connection opened')
    try:
    while True:
    msg = yield from subscriber.next_published()
    ws.send_str(msg.value + '/answer')
    except:
    pass
    finally:
    print('Connection closed')
    return ws

    此处理程序仅订阅 Redis channel ch1ch2 并将从这些 channel 收到的每条消息发送到 websocket。

  • 我想要这个处理程序:

    class WebsocketEchoHandler:

    @asyncio.coroutine
    def __call__(self, request):
    ws = web.WebSocketResponse()
    ws.start(request)

    connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
    subscriber = yield from connection.start_subscribe()
    yield from subscriber.subscribe(['ch1', 'ch2'])

    print('Connection opened')
    try:
    while True:
    # If message recived from redis OR from websocket
    msg_ws = yield from ws.receive()
    msg_redis = yield from subscriber.next_published()
    if msg_ws:
    # push to redis / do something else
    self.on_msg_from_ws(msg_ws)
    if msg_redis:
    self.on_msg_from_redis(msg_redis)
    except:
    pass
    finally:
    print('Connection closed')
    return ws

    但以下代码总是按顺序调用,因此从 websocket 读取会阻止从 Redis 读取:

    msg_ws = yield from ws.receive()
    msg_redis = yield from subscriber.next_published()

我希望在事件上完成阅读,其中事件是从两个来源之一收到的消息。

最佳答案

您应该使用两个 while 循环 - 一个处理来自 websocket 的消息,一个处理来自 redis 的消息。您的主处理程序可以启动两个协程,一个处理每个循环,然后等待它们两个:

class WebsocketEchoHandler:
@asyncio.coroutine
def __call__(self, request):
ws = web.WebSocketResponse()
ws.start(request)

connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
subscriber = yield from connection.start_subscribe()
yield from subscriber.subscribe(['ch1', 'ch2'])

print('Connection opened')
try:
# Kick off both coroutines in parallel, and then block
# until both are completed.
yield from asyncio.gather(self.handle_ws(ws), self.handle_redis(subscriber))
except Exception as e: # Don't do except: pass
import traceback
traceback.print_exc()
finally:
print('Connection closed')
return ws

@asyncio.coroutine
def handle_ws(self, ws):
while True:
msg_ws = yield from ws.receive()
if msg_ws:
self.on_msg_from_ws(msg_ws)

@asyncio.coroutine
def handle_redis(self, subscriber):
while True:
msg_redis = yield from subscriber.next_published()
if msg_redis:
self.on_msg_from_redis(msg_redis)

通过这种方式,您可以从两个潜在来源中的任何一个读取信息,而无需关心另一个。

关于python - Asyncio + aiohttp - redis Pub/Sub 和 websocket 在单个处理程序中读/写,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31670127/

45 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com