gpt4 book ai didi

python - 在 Beam 中读取和写入序列化的 protobuf

转载 作者:太空宇宙 更新时间:2023-11-03 14:47:11 25 4
gpt4 key购买 nike

我想将序列化 protobuf 消息的 PCollection 写入文本文件并读回它们应该很容易。但我尝试了几次后都没有这样做。如果有人有任何意见,将不胜感激。

// definition of proto.

syntax = "proto3";
package test;
message PhoneNumber {
string number = 1;
string country = 2;
}

我有下面的 python 代码,它实现了一个简单的 Beam 管道,用于将文本写入序列化的 protobuf。

# Test python code
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import phone_pb2

class ToProtoFn(beam.DoFn):
def process(self, element):
phone = phone_pb2.PhoneNumber()
phone.number, phone.country = element.strip().split(',')
yield phone.SerializeToString()

with beam.Pipeline(options=PipelineOptions()) as p:
lines = (p
| beam.Create(["123-456-789,us", "345-567-789,ca"])
| beam.ParDo(ToProtoFn())
| beam.io.WriteToText('/Users/greeness/data/phone-pb'))

管道可以成功运行并生成一个包含以下内容的文件:

$ cat ~/data/phone-pb-00000-of-00001 


123-456-789us


345-567-789ca

然后我编写另一个管道来读取序列化的 protobuf 并使用 ParDo 解析它们。

class ToCsvFn(beam.DoFn):
def process(self, element):
phone = phone_pb2.PhoneNumber()
phone.ParseFromString(element)
yield ",".join([phone.number, phone.country])

with beam.Pipeline(options=PipelineOptions()) as p:
lines = (p
| beam.io.ReadFromText('/Users/greeness/data/phone*')
| beam.ParDo(ToCsvFn())
| beam.io.WriteToText('/Users/greeness/data/phone-csv'))

我在运行时收到此错误消息。

  File "/Library/Python/2.7/site-packages/apache_beam/runners/common.py", line 458, in process_outputs
for result in results:
File "phone_example.py", line 37, in process
phone.ParseFromString(element)
File "/Library/Python/2.7/site-packages/google/protobuf/message.py", line 185, in ParseFromString
self.MergeFromString(serialized)
File "/Library/Python/2.7/site-packages/google/protobuf/internal/python_message.py", line 1069, in MergeFromString
raise message_mod.DecodeError('Truncated message.')
DecodeError: Truncated message. [while running 'ParDo(ToCsvFn)']

所以看起来序列化后的protobuf字符串无法解析。我错过了什么吗?感谢您的帮助!

最佳答案

我通过实现的 tfrecordio.py 找到了一个临时解决方案。

下面的代码是有效的。但我仍然愿意接受任何可以解决上述问题的评论。

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import phone_pb2

def WriteTextToTFRecord():
class ToProtoFn(beam.DoFn):
def process(self, element):
phone = phone_pb2.PhoneNumber()
phone.number, phone.country = element.strip().split(',')
yield phone
with beam.Pipeline(options=PipelineOptions()) as p:
lines = p | beam.Create(["123-456-789,us", "345-567-789,ca"])
processed = (
lines
| beam.ParDo(ToProtoFn())
| beam.io.WriteToTFRecord('/Users/greeness/data/phone-pb',
coder=beam.coders.ProtoCoder(phone_pb2.PhoneNumber().__class__)))

def ReadTFRecordAndSaveAsCSV():
class ToCsvFn(beam.DoFn):
def process(self, element):
yield ','.join([element.number, element.country])
with beam.Pipeline(options=PipelineOptions()) as p:
lines = (p
| beam.io.ReadFromTFRecord('/Users/greeness/data/phone-pb-*',
coder=beam.coders.ProtoCoder(phone_pb2.PhoneNumber().__class__))
| beam.ParDo(ToCsvFn())
| beam.io.WriteToText('/Users/greeness/data/phone-csv'))

if __name__ == '__main__':
WriteTextToTFRecord()
ReadTFRecordAndSaveAsCSV()

关于python - 在 Beam 中读取和写入序列化的 protobuf,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48373131/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com