- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
上下文
我正在使用一个流管道,它在 pubsub 中有一个 protobuf 数据源。我希望将此 protobuf 解析为 python 字典,因为数据接收器要求输入是字典的集合。我通过初始化process
中的protobuf消息成功开发了一个Protobuf解析器。 DoFn 的函数。
但是,我想知道,是否可以在 Beam 上制作通用的 ProtobufParser DoFn?从工程角度来看,通用 DoFn 非常有用,可以避免重新实现现有功能并实现代码重用。在 Java 中,我知道我们可以使用泛型,因此在 Java 中实现这个泛型 ProtobufParser 相对容易。由于Python函数是一等对象,我在想是否可以将Protobuf模式类(而不是消息实例对象)传递到DoFn中。我尝试这样做,但一直失败。
下面是我当前成功的 protobuf 解析器。 protobuf消息在process
内部初始化功能。
class ParsePubSubProtoToDict(beam.DoFn):
def process(self, element, *args, **kwargs):
from datapipes.protos.data_pb2 import DataSchema
from google.protobuf.json_format import MessageToDict
message = DataSchema()
message.ParseFromString(element)
obj = MessageToDict(message, preserving_proto_field_name=True)
yield obj
虽然上述 Protobuf DoFn 解析器可以正常工作,但它并不能推广到所有 protobuf 模式,因此这将导致需要为不同的 protobuf 模式重新实现新的 DoFn 解析器。
为了使解析器可推广到所有 protobuf 模式,我尝试将 protobuf 模式(在 Python 中作为类生成)传递给 DoFn。
class ParsePubSubProtoToDict(beam.DoFn):
def __init__(self, proto_class):
self.proto_class = proto_class
def process(self, element, *args, **kwargs):
from google.protobuf.json_format import MessageToDict
message = self.proto_class()
message.ParseFromString(element)
obj = MessageToDict(message, preserving_proto_field_name=True)
yield obj
def run_pubsub_to_gbq_pipeline(argv):
...
from datapipes.protos import data_pb2
with beam.Pipeline(options=options) as p:
(p |
'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=pubsub_config["subscription"]) |
'Proto to JSON' >> beam.ParDo(ParsePubSubProtoToDict(data_pb2.DataSchema().__class__)) |
'Print Result' >> beam.Map(lambda x: print_data(x))
和其他类似的技术,但是,我的所有尝试都失败并出现相同的错误消息: pickle.PicklingError: Can't pickle <class 'data_pb2.DataSchema'>: it's not found as data_pb2.DataSchema
根据此错误消息,我对问题发生的原因有两种假设:
Protobuf 架构类不可序列化。然而,这个假设可能是错误的,因为虽然我知道pickle
如果我使用 dill
,则无法序列化 protobuf 模式,我能够序列化 protobuf 模式。但除此之外,我仍然有点不确定Python Beam中的DoFn如何实现序列化(例如:当它使用 dill
或 pickle
来序列化事物时,对象的序列化格式是什么,以使其可序列化并兼容与 DoFn 等)
DoFn 类中导入错误。由于函数/类作用域和数据流工作人员的原因,我在 python Beam 中遇到了几个导入错误问题,为了解决这个问题,我必须在需要的函数中本地导入包,而不是在模块中全局导入包。那么也许,如果我们将 protobuf 模式类传递给 DoFn,模式导入实际上是在 DoFn 外部完成的,因此 DoFn 无法正确解析 DoFn 内部的类名?
我的问题是:
__init__
) 是可序列化的? beam 上是否有一个可序列化的类,我可以继承该类,以便我可以将不可序列化的对象转换为可序列化的?非常感谢!我们将非常感谢您的帮助。
最佳答案
我实际上找到了一种使用 beam.Map
创建通用 Protobuf 解析器的替代解决方案
def convert_proto_to_dict(data, schema_class):
message = schema_class()
if isinstance(data, (str, bytes)):
message.ParseFromString(data)
else:
message = data
return MessageToDict(message, preserving_proto_field_name=True)
def run_pubsub_to_gbq_pipeline(argv):
... options initialization
from datapipes.protos import data_pb2
with beam.Pipeline(options=options) as p:
(p |
'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=pubsub_config["subscription"]) |
'Proto to Dict' >> beam.Map(lambda x: convert_proto_to_dict(x, data_pb2.DataSchema)) |
'Print Result' >> beam.Map(lambda x: print_data(x))
首先,我创建了一个函数,它接收 protobuf 模式类和 protobuf 数据(当前为字节字符串)作为参数。该函数将初始化字符串字节数据并将其解析为protobuf消息,并将protobuf消息转换为python字典。
此函数随后由 beam.Map
使用,因此现在我能够在没有 beam.DoFn
的情况下在 Beam 上开发通用的 Protobuf 解析器。但是,我仍然很好奇为什么 protobuf 模式类在与 DoFn 一起使用时会出现问题,所以如果您知道原因以及如何解决这个问题,请在这里分享您的答案,谢谢!
关于python - 如何在 python Beam 中制作通用的 Protobuf 解析器 DoFn?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55869110/
我试图了解是否有可能采用构成另一个 protobuf 一部分的序列化 protobuf 并将它们合并在一起而不必反序列化第一个 protobuf。 例如,给定一个 protobuf 包装器: synt
正如我最近发现的,我们可以使用两个类 ProtoBuf.Serializer 和 ProtoBuf.Meta.TypeModel 在 protobuf-net 中序列化/反序列化。例如,假设我们有一个
我正在尝试使用 protobuf 序列化我的下面的类,但它因“对象引用”错误而失败。更多详情如下。通过查看错误详细信息,您知道会出现什么问题吗?注意:我的用户对象太大了,它有太多的子对象和属性。所以不
我想识别要反序列化的文件是否是protobuf序列化的。这是因为我想提供不止一种选项来为用户序列化和反序列化文件。我正在使用 protobuf.net 序列化为 protobuf 格式。 最佳答案 不
我已经使用位于 https://protogen.marcgravell.com/ 的工具构建了我的 C# 类来自 https://developers.google.com/transit/gtfs
有一个通过 UDP 接受消息的 Go 服务器。使用这种设计,它只能扫描一种特定类型的实体,world.Entity . for { buf := make([]byte, 10
比如我想序列化和反序列化System.Drawing.Font这是不可变的,不能更改以适应 protobuf-net 约定。一般来说,是否可以在 protobuf-net 中编写某种“自定义”序列化程
我开始用 protobuf 2.2.0 构建一个应用程序,因为它是最新的。现在我正在考虑升级到最新的 protobuf 2.4.0a。 如果我这样做,对于同一架构,一个版本的应用程序生成的消息是否仍然
在我从 BinaryFormatter 切换到 protobuf-net 的过程中, 我在序列化集合时观察到了差异。 在下面的代码示例中,反序列化(protobuf-net v2r470)返回 如果在
知道正在发送的 protobuf 消息类型的 API 是什么? 例如,我使用以下方法获取 SendNameMessage 对象。 SendNameMessage sendNameObj = Seria
我在我们的一个项目中使用 protobuf-net 来序列化/反序列化一大组同类对象。它运行良好,速度非常快。不过只有一个问题。反序列化时是否可以使用 linq(或任何其他机制)指定过滤条件,以便加载
我正在尝试使用 protobuf-net 序列化一些对象,但不幸的是他们自由地使用了 DateTimeOffset , protobuf-net 尚不支持。这导致了很多: No serializer
我在 ionic2 项目中使用 protobuf.js。我有一个有效的 .proto 文件,我首先将其转换为静态 javascript 文件: pbjs -t static databaseapi.p
我通过 vcpkg vcpkg install protobuf:x64-windows 安装了 protobuf .显然它安装了最新版本(3.6.1)。对于我需要版本<=3.5.1的项目。有没有办法
我有以下类(class):- [Serializable] [DataContract(Name = "StateValueWrapper")] public class StateValueWrap
protobuf net 似乎不支持列表/数组的 AsReference 以及列表/数组内对象的 AsReference。这会在最终的 v2 中得到支持吗? [ProtoMember(1, AsRef
我正在使用 protobuf-net 来序列化和反序列化我的消息。我的消息还包含可以为空的字符串。但是,当我在另一侧反序列化它们时,我得到空字符串 ("")。 根据谷歌文档,空字符串中字符串类型的默认
我已经阅读了有关继承的各种帖子,并且 Protocol Buffer 不支持继承。我不想继承 Protocol Buffers 消息,而是继承,这样我就可以轻松处理我的所有 Protocol Buff
我知道带有 protobuf.net 的列表不支持 AsReference,因此我尝试了解决此限制的方法。我创建了一个名为 SuperList 的自定义列表,其中包含包装在 SuperListItem
我正在尝试使用 ProtoMember 中的 AsReference 选项进行递归引用。如果我使用公共(public)构造函数创建 AnOwner 然后序列化/反序列化,AnOwner.Data 变为
我是一名优秀的程序员,十分优秀!