- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我有一个可以在本地执行而不会出现任何错误的管道。我曾经在本地运行的管道中遇到此错误
'Clients have non-trivial state that is local and unpickleable.'
PicklingError: Pickling client objects is explicitly not supported.
我相信我通过降级到 apache-beam=2.3.0 来解决这个问题然后在本地它会完美运行。
现在我正在使用 DataflowRunner 并且在 requirements.txt 文件中我有以下依赖项
apache-beam==2.3.0
google-cloud-bigquery==1.1.0
google-cloud-core==0.28.1
google-cloud-datastore==1.6.0
google-cloud-storage==1.10.0
protobuf==3.5.2.post1
pytz==2013.7
但是我又遇到了这个可怕的错误
'Clients have non-trivial state that is local and unpickleable.'
PicklingError: Pickling client objects is explicitly not supported.
为什么给我的是 DataflowRunner 错误而不是 DirectRunner 错误?他们不应该使用相同的依赖项/环境吗?任何帮助,将不胜感激。
我读到这是解决问题的方法,但当我尝试时,我仍然遇到同样的错误
class MyDoFn(beam.DoFn):
def start_bundle(self, process_context):
self._dsclient = datastore.Client()
def process(self, context, *args, **kwargs):
# do stuff with self._dsclient
来自 https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3191
我之前在本地修复此问题的引用帖子:
Using start_bundle() in apache-beam job not working. Unpickleable storage.Client()
提前致谢!
最佳答案
在 start_bundle
方法中初始化 unpickleable 客户端是一种正确的方法,Beam IO 通常遵循这一点,参见 datastoreio.py举个例子。这是一个在 DoFn 中使用 GCS python 客户端执行简单操作的管道。我在 Apache Beam 2.16.0 上运行它没有问题。如果您仍然可以重现您的问题,请提供更多详细信息。
gcs_client.py 文件:
import argparse
import logging
import time
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import storage
class MyDoFn(beam.DoFn):
def start_bundle(self):
self.storage_client = storage.Client()
def process(self, element):
bucket = self.storage_client.get_bucket("existing-gcs-bucket")
blob = bucket.blob(str(int(time.time())))
blob.upload_from_string("payload")
return element
logging.getLogger().setLevel(logging.INFO)
_, options = argparse.ArgumentParser().parse_known_args()
pipeline_options = PipelineOptions(options)
p = beam.Pipeline(options=pipeline_options)
_ = p | beam.Create([None]) | beam.ParDo(MyDoFn())
p.run().wait_until_finish()
requirements.txt 文件:
google-cloud-storage==1.23.0
命令行:
python -m gcs_client \
--project=insert_your_project \
--runner=DataflowRunner \
--temp_location gs://existing-gcs-bucket/temp/ \
--requirements_file=requirements.txt \
--save_main_session
关于python - 数据流错误 : 'Clients have non-trivial state that is local and unpickleable' ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50611890/
我被要求编写一个递归代码来打印一个数组。一位 friend 向我展示了这段代码: include int i=0; void print(int A[], int n) { if(i voi
我正在尝试让 google 的示例(应用内购买 version 3)测试应用“TrivialDrive”正常工作,但遇到以下问题: 我点击“buy gas”按钮,出现以下屏幕: 此消息有两种可能的解释
我知道这是一个非常常见的主题,但尽管典型的 UB 很容易找到,但到目前为止我还没有找到这个变体。 因此,我尝试正式引入 Pixel 对象,同时避免数据的实际拷贝。 这有效吗? struct Pixel
当一个函数接受一个函数参数(或者一个类有一个函数槽)时,有两种方法可供选择: def foo(..., my_func=None, ...): ... if my_func:
你知道任何 git 命令的工具/开关可以帮助我防止冲突或加速 merge 吗? 目前我经常遇到这样的“琐碎的冲突”: ++>>>>>> 587f917... 我想这是由一些空格引起的。这很容易被人类解
我在国际象棋引擎上工作了一段时间。为了改进引擎,我编写了一些代码,将国际象棋位置从内存加载到一些调谐器代码中。我的机器上大约有 1.85B fens,加起来达到 40Gb(每个位置 24B)。 加载后
我想在 Spark 中加入两个数据集。这就是我所做的: Dataset data = spark.read().format("parquet").load("hdfs://path"); Datas
我在国际象棋引擎上工作了一段时间。为了改进引擎,我编写了一些代码,将国际象棋位置从内存加载到一些调谐器代码中。我的机器上大约有 1.85B fens,加起来达到 40Gb(每个位置 24B)。 加载后
根据我对标准的理解,普通析构函数是一种隐式声明的析构函数,其类只有基本和非静态成员以及普通析构函数。鉴于此定义的递归性,在我看来,唯一的“递归停止”条件是找到具有非隐式声明的析构函数(即用户声明的)的
在 C++0x 中,我想确定一个类是否简单/是否具有标准布局,以便我可以使用 memcpy()、memset() 等... 我应该如何使用 type_traits 实现下面的代码,这样我才能确认一个类
我有一个带有“日期”列的表,我想做一个执行以下操作的查询: 如果日期是 星期一 , 周二 , 周三 , 或 星期四 , 显示的日期应该上移 1 天,如 DATEADD(day, 1, [Date])
我有一个非常复杂的项目(大约 100 个模块),我想在它上面运行 mvn dependency:tree .它失败了,提示它无法解决的依赖关系。该项目否则编译得很好。所以我创建了我能想到的最基本的项目
我希望这会返回一个 Date 对象,表示从现在开始一小时后的时间: Calendar.current.date(byAdding: DateComponents(hour: 1), to: Date(
Blocksworld显然是自动化规划中的基准领域。 This domain consists of a set of blocks, a table and a robot hand. The bl
我是 Angular 新手,我想做一些重要的输入验证。 基本上我有一张 table 。每行包含三个文本输入。当用户输入任何文本输入时,我想检查该表是否至少包含一行具有三个非空白输入字段。如果是的话
我无法理解导致索引错误的原因,而不是寻找快速修复。但是,如果我的代码让您反感/非常无效,请告诉我。目标是生成由两个四位数的乘积产生的回文。 代码: for x in range(10000):
我有一些数据需要多个 Activity 操作。基本上有一个只读屏幕和多个编辑屏幕。 起初我考虑将数据作为字符串参数传递给 Intent,但如果用户在编辑字段后按下后退按钮,这些更改将会丢失。 那么在不
我会定义“平凡可 move ” Calling the move constructor (or the move assignment operator) is equivalent to memc
我只想 class Trivial t instance Trivial t 这在 Haskell 98 中当然没用,因为你可以忽略约束;但使用 ConstraintKinds,我们可以明确要求类型为
在 future 的 C++ 标准中,我们将拥有“平凡的可重定位性”的概念,这意味着我们可以简单地将字节从一个对象复制到未初始化的内存块,并简单地忽略/清零原始对象的字节。 这样,我们就模仿了 C 风
我是一名优秀的程序员,十分优秀!