gpt4 book ai didi

python - 数据流错误 : 'Clients have non-trivial state that is local and unpickleable'

转载 作者:太空宇宙 更新时间:2023-11-04 02:25:02 26 4
gpt4 key购买 nike

我有一个可以在本地执行而不会出现任何错误的管道。我曾经在本地运行的管道中遇到此错误

    'Clients have non-trivial state that is local and unpickleable.'
PicklingError: Pickling client objects is explicitly not supported.

我相信我通过降级到 apache-beam=2.3.0 来解决这个问题然后在本地它会完美运行。

现在我正在使用 DataflowRunner 并且在 requirements.txt 文件中我有以下依赖项

    apache-beam==2.3.0
google-cloud-bigquery==1.1.0
google-cloud-core==0.28.1
google-cloud-datastore==1.6.0
google-cloud-storage==1.10.0
protobuf==3.5.2.post1
pytz==2013.7

但是我又遇到了这个可怕的错误

    'Clients have non-trivial state that is local and unpickleable.'
PicklingError: Pickling client objects is explicitly not supported.

为什么给我的是 DataflowRunner 错误而不是 DirectRunner 错误?他们不应该使用相同的依赖项/环境吗?任何帮助,将不胜感激。

我读到这是解决问题的方法,但当我尝试时,我仍然遇到同样的错误

    class MyDoFn(beam.DoFn):

def start_bundle(self, process_context):
self._dsclient = datastore.Client()

def process(self, context, *args, **kwargs):
# do stuff with self._dsclient

来自 https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3191

我之前在本地修复此问题的引用帖子:

Using start_bundle() in apache-beam job not working. Unpickleable storage.Client()

提前致谢!

最佳答案

start_bundle 方法中初始化 unpickleable 客户端是一种正确的方法,Beam IO 通常遵循这一点,参见 datastoreio.py举个例子。这是一个在 DoFn 中使用 GCS python 客户端执行简单操作的管道。我在 Apache Beam 2.16.0 上运行它没有问题。如果您仍然可以重现您的问题,请提供更多详细信息。

gcs_client.py 文件:

import argparse
import logging
import time

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import storage

class MyDoFn(beam.DoFn):
def start_bundle(self):
self.storage_client = storage.Client()

def process(self, element):
bucket = self.storage_client.get_bucket("existing-gcs-bucket")
blob = bucket.blob(str(int(time.time())))
blob.upload_from_string("payload")
return element

logging.getLogger().setLevel(logging.INFO)
_, options = argparse.ArgumentParser().parse_known_args()

pipeline_options = PipelineOptions(options)
p = beam.Pipeline(options=pipeline_options)
_ = p | beam.Create([None]) | beam.ParDo(MyDoFn())

p.run().wait_until_finish()

requirements.txt 文件:

google-cloud-storage==1.23.0

命令行:

python -m gcs_client \
--project=insert_your_project \
--runner=DataflowRunner \
--temp_location gs://existing-gcs-bucket/temp/ \
--requirements_file=requirements.txt \
--save_main_session

关于python - 数据流错误 : 'Clients have non-trivial state that is local and unpickleable' ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50611890/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com