gpt4 book ai didi

Python 异步流 API

转载 作者:太空狗 更新时间:2023-10-30 02:44:03 25 4
gpt4 key购买 nike

我正在寻找一些关于 Python asyncio 包中 StreamReader 和 StreamWriter 类的使用模式的“超越基本”的指导。

我正在尝试使用 protobuf 构建带有自定义协议(protocol)的有状态服务器。我是否应该对 StreamReader 和 StreamWriter 进行子类化以管理来自 protobuf 字节的序列化?然后我可以在阅读器上提供一个 read_message 函数。我知道我可以从提供我自己的 StreamReader 的 streams.start_server 复制代码,但我该如何设置我的 StreamWriter?

非常感谢收到任何指示或示例。

最佳答案

我发现将 asyncio.streams 库类子类化相对简单。

start_server 函数是从 tcp 服务器示例中提取的:

@asyncio.coroutine
def start_server(self, loop):
def factory():
reader = QbpStreamReader()
return QbpStreamReaderProtocol(reader, self._accept_client)

logger.info("QbpServer starting at tcp://%s:%s", self.host, self.port)
self.server = yield from loop.create_server(factory, self.host, self.port)

为了构建我自己的 StreamWriter,有必要对 StreamReaderProtocol 进行子类化。除此之外,这与库函数相同。

class QbpStreamReaderProtocol(streams.StreamReaderProtocol):
def connection_made(self, transport):
self._stream_reader.set_transport(transport)
if self._client_connected_cb is not None:
self._stream_writer = QbpStreamWriter(transport, self,
self._stream_reader,
self._loop)
res = self._client_connected_cb(self._stream_reader,
self._stream_writer)
if coroutines.iscoroutine(res):
self._loop.create_task(res)

对于传出消息:

class QbpStreamWriter(streams.StreamWriter):
def write_msg(self, msg):
# data = serialise message
self.write(data)

对于传入的消息:

class QbpStreamReader(streams.StreamReader):
@asyncio.coroutine
def read_msg(self):
data = yield from self.readexactly(header_length)
# msg_type, msg_length = unpack header
data = yield from self.readexactly(msg_length)
return build_message(msg_type, data)

希望对大家有帮助

关于Python 异步流 API,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31077182/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com