gpt4 book ai didi

python - 如何将 python-trio 与谷歌 Protocol Buffer 一起使用?

转载 作者:行者123 更新时间:2023-11-28 17:59:02 25 4
gpt4 key购买 nike

我正在尝试在 python 中使用 protobuf 读取一些数据流,我想使用 trio 使客户端读取流。 protobuf 有一些方法调用,我发现当我使用 trio 流时它们不起作用。

Linux 机器上的 Python 客户端。

import DTCProtocol_pb2 as Dtc

async def parent(addr, encoding, heartbeat_interval):
print(f"parent: connecting to 127.0.0.1:{addr[1]}")
client_stream = await trio.open_tcp_stream(addr[0], addr[1])

# encoding request
print("parent: spawing encoding request ...")
enc_req = create_enc_req(encoding) # construct encoding request
await send_message(enc_req, Dtc.ENCODING_REQUEST,client_stream, 'encoding request') # send encoding request

log.debug('get_reponse: started')
response = await client_stream.receive_some(1024)
m_size = struct.unpack_from('<H', response[:2]) # the size of message
m_type = struct.unpack_from('<H', response[2:4]) # the type of the message
m_body = response[4:]
m_resp = Dtc.EncodingResponse()

m_body 是一些字节数据,我不知道如何解码。 Dtc.EncodingResponse() 是 protobuf 方法,它会给出一个包含可读格式响应的 Dtc 对象。 (dtc 是 protobuf 文件)。但我在这里一无所获。当我在没有三重奏的情况下执行此脚本时,Dtc.EncodingResponse() 将以可读格式给出完整的响应。

我猜问题是“client_stream”是一个只读取字节的三重流对象,所以我可能需要使用 ReceiveChannel 对象来代替。但如果这是真的,我不知道该怎么做。

更新:Nathaniel J. Smith 的以下回答解决了我的问题。

m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_body)

我觉得好傻,但是我之前没有ParseFromString数据,就这样了。非常感谢所有给予答复的人。希望这可以帮助那里的人。

最佳答案

就像@shmee 在评论中所说的那样,我认为您的代码因编辑而被破坏了一些...您应该仔细检查。

When I did this script without trio, Dtc.EncodingResponse() would give the full response in readable format

我想您在切换到 Trio 时可能掉线了? Dtc.EncodingResponse() 只是创建一个新的空 EncodingResponse 对象。如果你想将数据从 m_body 解析到你的新对象中,你必须明确地这样做,像这样:

m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_body)

但是,还有另外一个问题……它被称为receive_some 的原因是它接收了一些 字节,但可能没有接收到所有你要求的字节。您的代码假设对 receive_some 的一次调用将获取响应中的所有字节,当您进行简单测试时这可能是正确的,但通常不能保证。如果您在第一次调用 receive_some 时没有获得足够的数据,您可能需要不断重复调用它,直到获得所有数据。

这实际上是非常标准的……套接字的工作方式相同。这就是为什么您的服务器首先要在开头发送一个 m_size 字段的原因 - 这样您就可以判断您是否已获取所有数据!

不幸的是,截至 2019 年 6 月,Trio 没有提供 helper 来为您执行此循环——您可以在 this issue 中跟踪进度.同时,您可以自己编写。我认为这样的事情应该可行:

async def receive_exactly(stream, count):
buf = bytearray()
while len(buf) < count:
new_data = await stream.receive_some(count - len(buf))
if not new_data:
raise RuntimeError("other side closed the connection unexpectedly")
buf += new data
return buf

async def receive_encoding_response(stream):
header = await receive_exactly(stream, 4)
(m_size, m_type) = struct.unpack('<HH', header)
m_body = await receive_exactly(stream, m_size)
m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_size)
return m_resp

关于python - 如何将 python-trio 与谷歌 Protocol Buffer 一起使用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56592227/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com