gpt4 book ai didi

python - 为什么自定义 Python 对象不能与 ParDo Fn 一起使用?

转载 作者:行者123 更新时间:2023-11-28 22:11:19 58 4
gpt4 key购买 nike

我目前不熟悉在 Python 中将 Apache Beam 与数据流运行器结合使用。我对创建发布到 Google Cloud PubSub 的批处理管道很感兴趣,我修改了 Beam Python API 并找到了解决方案。然而,在探索过程中,我遇到了一些有趣的问题,这让我很好奇。

1。成功的管道

目前,我从 GCS 以批量方式发布数据的成功 Beam 管道如下所示:

class PublishFn(beam.DoFn):
def __init__(self, topic_path):
self.topic_path = topic_path
super(self.__class__, self).__init__()

def process(self, element, **kwargs):
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
future = publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()


def run_gcs_to_pubsub(argv):
options = PipelineOptions(flags=argv)

from datapipes.common.dataflow_utils import CsvFileSource
from datapipes.protos import proto_schemas_pb2
from google.protobuf.json_format import MessageToJson

with beam.Pipeline(options=options) as p:
normalized_data = (
p |
"Read CSV from GCS" >> beam.io.Read(CsvFileSource(
"gs://bucket/path/to/file.csv")) |
"Normalize to Proto Schema" >> beam.Map(
lambda data: MessageToJson(
proto_schemas_pb2(data, proto_schemas_pb2.MySchema()),
indent=0,
preserving_proto_field_name=True)
)
)
(normalized_data |
"Write to PubSub" >> beam.ParDo(
PublishFn(topic_path="projects/my-gcp-project/topics/mytopic"))
)

2。不成功的管道

在这里,我试图让发布者在 DoFn 中共享。我尝试了以下方法。

一个。在 DoFn 中初始化发布者

class PublishFn(beam.DoFn):
def __init__(self, topic_path):
from google.cloud import pubsub_v1

batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024, # One kilobyte
max_latency=1, # One second
)
self.publisher = pubsub_v1.PublisherClient(batch_settings)
self.topic_path = topic_path
super(self.__class__, self).__init__()

def process(self, element, **kwargs):
future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()

def run_gcs_to_pubsub(argv):
... ## same as 1

b。在 DoFn 之外初始化 Publisher,并将其传递给 DoFn

class PublishFn(beam.DoFn):
def __init__(self, publisher, topic_path):
self.publisher = publisher
self.topic_path = topic_path
super(self.__class__, self).__init__()

def process(self, element, **kwargs):
future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()


def run_gcs_to_pubsub(argv):
.... ## same as 1

batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024, # One kilobyte
max_latency=1, # One second
)
publisher = pubsub_v1.PublisherClient(batch_settings)

with beam.Pipeline(options=options) as p:
... # same as 1
(normalized_data |
"Write to PubSub" >> beam.ParDo(
PublishFn(publisher=publisher, topic_path="projects/my-gcp-project/topics/mytopic"))
)

使发布者在 DoFn 方法之间共享的两次尝试均失败,并显示以下错误消息:

  File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__

  File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__

我的问题是:

  1. 共享发布器实现是否会提高波束管道性能?如果是,那么我想探索这个解决方案。

  2. 为什么我的失败管道会出现错误?是否由于在 process 函数之外初始化自定义类对象并将其传递给 DoFn?如果是因为这个原因,我该如何实现一个管道,以便能够在 DoFn 中重用自定义对象?

谢谢,非常感谢您的帮助。

编辑:解决方案

好的,Ankur 已经解释了我的问题出现的原因,并讨论了如何在 DoFn 上完成序列化。基于这些知识,我现在了解到在 DoFn 中有两种使自定义对象共享/可重用的解决方案:

  1. 使自定义对象可序列化:这允许对象在 DoFn 对象创建期间初始化/可用(在 __init__ 下)。该对象必须是可序列化的,因为它将在管道提交期间被序列化,在管道提交期间将创建 DoFn 对象(调用 __init__)。我的回答在下面回答了如何实现这一点。此外,我发现此要求实际上与 [1][2] 下的 Beam 文档相关联。

  2. __init__ 之外的 DoFn 函数中初始化不可序列化的对象以避免序列化,因为在管道提交期间不会调用 init 之外的函数。 Ankur 的回答中解释了如何完成此操作。

引用资料:

[1] https://beam.apache.org/documentation/programming-guide/#core-beam-transforms

[2] https://beam.apache.org/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms

最佳答案

PublisherClient 无法正确 pickle。更多关于酸洗 here .在 process 方法中初始化 PublisherClient 避免了 PublisherClient 的 pickle。

如果目的是重用 PublisherClient,我建议在 process 方法中初始化 PublisherClient 并使用以下代码将其存储在 self 中.

class PublishFn(beam.DoFn):
def __init__(self, topic_path):
self.topic_path = topic_path
super(self.__class__, self).__init__()

def process(self, element, **kwargs):
if not hasattr(self, 'publish'):
from google.cloud import pubsub_v1
self.publisher = pubsub_v1.PublisherClient()
future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()

关于python - 为什么自定义 Python 对象不能与 ParDo Fn 一起使用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55822881/

58 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com