- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
上下文
我正在使用一个流管道,它在 pubsub 中有一个 protobuf 数据源。我希望将此 protobuf 解析为 python 字典,因为数据接收器要求输入是字典的集合。我通过初始化process
中的protobuf消息成功开发了一个Protobuf解析器。 DoFn 的函数。
但是,我想知道,是否可以在 Beam 上制作通用的 ProtobufParser DoFn?从工程角度来看,通用 DoFn 非常有用,可以避免重新实现现有功能并实现代码重用。在 Java 中,我知道我们可以使用泛型,因此在 Java 中实现这个泛型 ProtobufParser 相对容易。由于Python函数是一等对象,我在想是否可以将Protobuf模式类(而不是消息实例对象)传递到DoFn中。我尝试这样做,但一直失败。
下面是我当前成功的 protobuf 解析器。 protobuf消息在process
内部初始化功能。
class ParsePubSubProtoToDict(beam.DoFn):
def process(self, element, *args, **kwargs):
from datapipes.protos.data_pb2 import DataSchema
from google.protobuf.json_format import MessageToDict
message = DataSchema()
message.ParseFromString(element)
obj = MessageToDict(message, preserving_proto_field_name=True)
yield obj
虽然上述 Protobuf DoFn 解析器可以正常工作,但它并不能推广到所有 protobuf 模式,因此这将导致需要为不同的 protobuf 模式重新实现新的 DoFn 解析器。
为了使解析器可推广到所有 protobuf 模式,我尝试将 protobuf 模式(在 Python 中作为类生成)传递给 DoFn。
class ParsePubSubProtoToDict(beam.DoFn):
def __init__(self, proto_class):
self.proto_class = proto_class
def process(self, element, *args, **kwargs):
from google.protobuf.json_format import MessageToDict
message = self.proto_class()
message.ParseFromString(element)
obj = MessageToDict(message, preserving_proto_field_name=True)
yield obj
def run_pubsub_to_gbq_pipeline(argv):
...
from datapipes.protos import data_pb2
with beam.Pipeline(options=options) as p:
(p |
'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=pubsub_config["subscription"]) |
'Proto to JSON' >> beam.ParDo(ParsePubSubProtoToDict(data_pb2.DataSchema().__class__)) |
'Print Result' >> beam.Map(lambda x: print_data(x))
和其他类似的技术,但是,我的所有尝试都失败并出现相同的错误消息: pickle.PicklingError: Can't pickle <class 'data_pb2.DataSchema'>: it's not found as data_pb2.DataSchema
根据此错误消息,我对问题发生的原因有两种假设:
Protobuf 架构类不可序列化。然而,这个假设可能是错误的,因为虽然我知道pickle
如果我使用 dill
,则无法序列化 protobuf 模式,我能够序列化 protobuf 模式。但除此之外,我仍然有点不确定Python Beam中的DoFn如何实现序列化(例如:当它使用 dill
或 pickle
来序列化事物时,对象的序列化格式是什么,以使其可序列化并兼容与 DoFn 等)
DoFn 类中导入错误。由于函数/类作用域和数据流工作人员的原因,我在 python Beam 中遇到了几个导入错误问题,为了解决这个问题,我必须在需要的函数中本地导入包,而不是在模块中全局导入包。那么也许,如果我们将 protobuf 模式类传递给 DoFn,模式导入实际上是在 DoFn 外部完成的,因此 DoFn 无法正确解析 DoFn 内部的类名?
我的问题是:
__init__
) 是可序列化的? beam 上是否有一个可序列化的类,我可以继承该类,以便我可以将不可序列化的对象转换为可序列化的?非常感谢!我们将非常感谢您的帮助。
最佳答案
我实际上找到了一种使用 beam.Map
创建通用 Protobuf 解析器的替代解决方案
def convert_proto_to_dict(data, schema_class):
message = schema_class()
if isinstance(data, (str, bytes)):
message.ParseFromString(data)
else:
message = data
return MessageToDict(message, preserving_proto_field_name=True)
def run_pubsub_to_gbq_pipeline(argv):
... options initialization
from datapipes.protos import data_pb2
with beam.Pipeline(options=options) as p:
(p |
'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=pubsub_config["subscription"]) |
'Proto to Dict' >> beam.Map(lambda x: convert_proto_to_dict(x, data_pb2.DataSchema)) |
'Print Result' >> beam.Map(lambda x: print_data(x))
首先,我创建了一个函数,它接收 protobuf 模式类和 protobuf 数据(当前为字节字符串)作为参数。该函数将初始化字符串字节数据并将其解析为protobuf消息,并将protobuf消息转换为python字典。
此函数随后由 beam.Map
使用,因此现在我能够在没有 beam.DoFn
的情况下在 Beam 上开发通用的 Protobuf 解析器。但是,我仍然很好奇为什么 protobuf 模式类在与 DoFn 一起使用时会出现问题,所以如果您知道原因以及如何解决这个问题,请在这里分享您的答案,谢谢!
关于python - 如何在 python Beam 中制作通用的 Protobuf 解析器 DoFn?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55869110/
我一直在使用 AJAX 从我正在创建的网络服务中解析 JSON 数组时遇到问题。我的前端是一个简单的 ajax 和 jquery 组合,用于显示从我正在创建的网络服务返回的结果。 尽管知道我的数据库查
很难说出这里要问什么。这个问题模棱两可、含糊不清、不完整、过于宽泛或夸夸其谈,无法以目前的形式得到合理的回答。如需帮助澄清此问题以便重新打开,visit the help center . 关闭 1
我在尝试运行 Android 应用程序时遇到问题并收到以下错误 java.lang.NoClassDefFoundError: com.parse.Parse 当我尝试运行该应用时。 最佳答案 在这
有什么办法可以防止etree在解析HTML内容时解析HTML实体吗? html = etree.HTML('&') html.find('.//body').text 这给了我 '&' 但我想
我有一个有点疯狂的例子,但对于那些 JavaScript 函数作用域专家来说,它看起来是一个很好的练习: (function (global) { // our module number one
关闭。此题需要details or clarity 。目前不接受答案。 想要改进这个问题吗?通过 editing this post 添加详细信息并澄清问题. 已关闭 8 年前。 Improve th
我需要编写一个脚本来获取链接并解析链接页面的 HTML 以提取标题和其他一些数据,例如可能是简短的描述,就像您链接到 Facebook 上的内容一样。 当用户向站点添加链接时将调用它,因此在客户端启动
在 VS Code 中本地开发时,包解析为 C:/Users//AppData/Local/Microsoft/TypeScript/3.5/node_modules/@types//index而不是
我在将 json 从 php 解析为 javascript 时遇到问题 这是我的示例代码: //function MethodAjax = function (wsFile, param) {
我在将 json 从 php 解析为 javascript 时遇到问题 这是我的示例代码: //function MethodAjax = function (wsFile, param) {
我被赋予了将一种语言“翻译”成另一种语言的工作。对于使用正则表达式的简单逐行方法来说,源代码过于灵活(复杂)。我在哪里可以了解更多关于词法分析和解析器的信息? 最佳答案 如果你想对这个主题产生“情绪化
您好,我在解析此文本时遇到问题 { { { {[system1];1;1;0.612509325}; {[system2];1;
我正在为 adobe after effects 在 extendscript 中编写一些代码,最终变成了 javascript。 我有一个数组,我想只搜索单词“assemble”并返回整个 jc3_
我有这段代码: $(document).ready(function() { // }); 问题:FB_RequireFeatures block 外部的代码先于其内部的代码执行。因此 who
背景: netcore项目中有些服务是在通过中间件来通信的,比如orleans组件。它里面服务和客户端会指定网关和端口,我们只需要开放客户端给外界,服务端关闭端口。相当于去掉host,这样省掉了些
1.首先贴上我试验成功的代码 复制代码 代码如下: protected void onMeasure(int widthMeasureSpec, int heightMeasureSpec)
什么是 XML? XML 指可扩展标记语言(eXtensible Markup Language),标准通用标记语言的子集,是一种用于标记电子文件使其具有结构性的标记语言。 你可以通过本站学习 X
【PHP代码】 复制代码 代码如下: $stmt = mssql_init('P__Global_Test', $conn) or die("initialize sto
在SQL查询分析器执行以下代码就可以了。 复制代码代码如下: declare @t varchar(255),@c varchar(255) declare table_cursor curs
前言 最近练习了一些前端算法题,现在做个总结,以下题目都是个人写法,并不是标准答案,如有错误欢迎指出,有对某道题有新的想法的友友也可以在评论区发表想法,互相学习🤭 题目 题目一: 二维数组中的
我是一名优秀的程序员,十分优秀!