gpt4 book ai didi

python - 如何在 python Beam 中制作通用的 Protobuf 解析器 DoFn?

转载 作者:行者123 更新时间:2023-12-01 07:57:56 34 4
gpt4 key购买 nike

上下文
我正在使用一个流管道,它在 pubsub 中有一个 protobuf 数据源。我希望将此 protobuf 解析为 python 字典,因为数据接收器要求输入是字典的集合。我通过初始化process中的protobuf消息成功开发了一个Protobuf解析器。 DoFn 的函数。

为什么需要通用 Protobuf 解析器

但是,我想知道,是否可以在 Beam 上制作通用的 ProtobufParser DoFn?从工程角度来看,通用 DoFn 非常有用,可以避免重新实现现有功能并实现代码重用。在 Java 中,我知道我们可以使用泛型,因此在 Java 中实现这个泛型 ProtobufParser 相对容易。由于Python函数是一等对象,我在想是否可以将Protobuf模式类(而不是消息实例对象)传递到DoFn中。我尝试这样做,但一直失败。

成功的解析器,但有警告:不可推广

下面是我当前成功的 protobuf 解析器。 protobuf消息在process内部初始化功能。

class ParsePubSubProtoToDict(beam.DoFn):

def process(self, element, *args, **kwargs):
from datapipes.protos.data_pb2 import DataSchema
from google.protobuf.json_format import MessageToDict

message = DataSchema()
message.ParseFromString(element)

obj = MessageToDict(message, preserving_proto_field_name=True)

yield obj

虽然上述 Protobuf DoFn 解析器可以正常工作,但它并不能推广到所有 protobuf 模式,因此这将导致需要为不同的 protobuf 模式重新实现新的 DoFn 解析器。

我的尝试

为了使解析器可推广到所有 protobuf 模式,我尝试将 protobuf 模式(在 Python 中作为类生成)传递给 DoFn。

class ParsePubSubProtoToDict(beam.DoFn):
def __init__(self, proto_class):
self.proto_class = proto_class

def process(self, element, *args, **kwargs):
from google.protobuf.json_format import MessageToDict

message = self.proto_class()
message.ParseFromString(element)
obj = MessageToDict(message, preserving_proto_field_name=True)

yield obj


def run_pubsub_to_gbq_pipeline(argv):
...
from datapipes.protos import data_pb2

with beam.Pipeline(options=options) as p:
(p |
'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=pubsub_config["subscription"]) |
'Proto to JSON' >> beam.ParDo(ParsePubSubProtoToDict(data_pb2.DataSchema().__class__)) |
'Print Result' >> beam.Map(lambda x: print_data(x))

和其他类似的技术,但是,我的所有尝试都失败并出现相同的错误消息: pickle.PicklingError: Can't pickle <class 'data_pb2.DataSchema'>: it's not found as data_pb2.DataSchema

根据此错误消息,我对问题发生的原因有两种假设:

  1. Protobuf 架构类不可序列化。然而,这个假设可能是错误的,因为虽然我知道pickle如果我使用 dill ,则无法序列化 protobuf 模式,我能够序列化 protobuf 模式。但除此之外,我仍然有点不确定Python Beam中的DoFn如何实现序列化(例如:当它使用 dillpickle 来序列化事物时,对象的序列化格式是什么,以使其可序列化并兼容与 DoFn 等)

  2. DoFn 类中导入错误。由于函数/类作用域和数据流工作人员的原因,我在 python Beam 中遇到了几个导入错误问题,为了解决这个问题,我必须在需要的函数中本地导入包,而不是在模块中全局导入包。那么也许,如果我们将 protobuf 模式类传递给 DoFn,模式导入实际上是在 DoFn 外部完成的,因此 DoFn 无法正确解析 DoFn 内部的类名?

<小时/>

我的问题是:

  1. 为什么会出现此错误?如何解决此错误?
  2. 是否可以传递 protobuf 模式类?或者是否有更好的方法来实现 python 字典解析器 DoFn 的通用 protobuf 而不将 protobuf 模式类传递给 DoFn?
  3. Python 中的 DoFn 是如何工作的,如何确保传递给 DoFn 创建的对象 ( __init__ ) 是可序列化的? beam 上是否有一个可序列化的类,我可以继承该类,以便我可以将不可序列化的对象转换为可序列化的?

非常感谢!我们将非常感谢您的帮助。

最佳答案

我实际上找到了一种使用 beam.Map 创建通用 Protobuf 解析器的替代解决方案

def convert_proto_to_dict(data, schema_class):
message = schema_class()

if isinstance(data, (str, bytes)):
message.ParseFromString(data)
else:
message = data

return MessageToDict(message, preserving_proto_field_name=True)


def run_pubsub_to_gbq_pipeline(argv):
... options initialization
from datapipes.protos import data_pb2

with beam.Pipeline(options=options) as p:
(p |
'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=pubsub_config["subscription"]) |
'Proto to Dict' >> beam.Map(lambda x: convert_proto_to_dict(x, data_pb2.DataSchema)) |
'Print Result' >> beam.Map(lambda x: print_data(x))

首先,我创建了一个函数,它接收 protobuf 模式类和 protobuf 数据(当前为字节字符串)作为参数。该函数将初始化字符串字节数据并将其解析为protobuf消息,并将protobuf消息转换为python字典。

此函数随后由 beam.Map 使用,因此现在我能够在没有 beam.DoFn 的情况下在 Beam 上开发通用的 Protobuf 解析器。但是,我仍然很好奇为什么 protobuf 模式类在与 DoFn 一起使用时会出现问题,所以如果您知道原因以及如何解决这个问题,请在这里分享您的答案,谢谢!

关于python - 如何在 python Beam 中制作通用的 Protobuf 解析器 DoFn?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55869110/

34 4 0
文章推荐: python - 在python循环中获取超链接和文本
文章推荐: java - 将 List 转换为列表字符串