- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我可以使用 DirectRunner
运行这段代码,它运行良好。使用 DataflowRunner
它会崩溃:
TypeError: process() takes exactly 4 arguments (3 given) [while running 'write_text/Write/WriteImpl/WriteBundles']`
我的 apache-beam-sdk 是按照说明中的指示从 master 克隆和构建的。它构建为 apache-beam-sdk==0.6.0.dev0
。然而,我对版本持怀疑态度,因为(我认为)我最近看到代码更改而版本没有更改(NewDoFn
消失但版本没有更改)。
我不确定这是否是问题的根源,但安装的 sdk 和数据流容器似乎不匹配。我收到另一个不匹配类型的错误,其中 DirectRunner
将 element
直接传递给我的 DoFn.process()
而 DataflowRunner
通过上下文
。
我试图将其隔离为尽可能简单的代码:
import uuid
import apache_beam.utils.pipeline_options
import apache_beam as beam
runner = 'DataflowRunner'
# runner = 'DirectRunner'
options = beam.utils.pipeline_options.PipelineOptions()
gcloud_options = options.view_as(beam.utils.pipeline_options.GoogleCloudOptions)
gcloud_options.job_name = 'a' + str(uuid.uuid4())
gcloud_options.project = 'your-project'
gcloud_options.staging_location = 'gs://your-bucket/beam/staging'
gcloud_options.temp_location = 'gs://your-bucket/beam/temp'
options.view_as(beam.utils.pipeline_options.StandardOptions).runner = runner
p = beam.Pipeline(options=options)
(p
| 'some_strings' >> beam.Create(tuple('asdfqwert'))
| 'write_text' >> beam.io.WriteToText('strings', file_name_suffix='.txt')
)
p.run().wait_until_finish()
完整输出:
No handlers could be found for logger "oauth2client.contrib.multistore_file"
/Users/john/miniconda3/envs/py2/lib/python2.7/site-packages/apache_beam/coders/typecoders.py:136: UserWarning: Using fallback coder for typehint: Any.
warnings.warn('Using fallback coder for typehint: %r.' % typehint)
DEPRECATION: pip install --download has been deprecated and will be removed in the future. Pip now has a download command that should be used instead.
Collecting google-cloud-dataflow==0.5.1
Using cached google-cloud-dataflow-0.5.1.tar.gz
Saved /var/folders/v3/61xx4nnn6p36n5m9fp4qdwtr0000gn/T/tmpuCWoeh/google-cloud-dataflow-0.5.1.tar.gz
Successfully downloaded google-cloud-dataflow
Traceback (most recent call last):
File "reproduce_bug.py", line 28, in <module>
p.run().wait_until_finish()
File "/Users/john/miniconda3/envs/py2/lib/python2.7/site-packages/apache_beam/runners/dataflow_runner.py", line 706, in wait_until_finish
(self.state, getattr(self._runner, 'last_error_msg', None)), self)
apache_beam.runners.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
(70278eb56b40fd94): Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 514, in do_work
work_executor.execute()
File "dataflow_worker/executor.py", line 899, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:26452)
op.start()
File "dataflow_worker/executor.py", line 191, in dataflow_worker.executor.ReadOperation.start (dataflow_worker/executor.c:7575)
def start(self):
File "dataflow_worker/executor.py", line 196, in dataflow_worker.executor.ReadOperation.start (dataflow_worker/executor.c:7480)
with self.spec.source.reader() as reader:
File "dataflow_worker/executor.py", line 206, in dataflow_worker.executor.ReadOperation.start (dataflow_worker/executor.c:7425)
self.output(windowed_value)
File "dataflow_worker/executor.py", line 136, in dataflow_worker.executor.Operation.output (dataflow_worker/executor.c:5749)
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "dataflow_worker/executor.py", line 83, in dataflow_worker.executor.ConsumerSet.receive (dataflow_worker/executor.c:3884)
cython.cast(Operation, consumer).process(windowed_value)
File "dataflow_worker/executor.py", line 505, in dataflow_worker.executor.DoOperation.process (dataflow_worker/executor.c:15525)
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 163, in apache_beam.runners.common.DoFnRunner.receive (apache_beam/runners/common.c:4862)
self.process(windowed_value)
File "apache_beam/runners/common.py", line 270, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:7749)
self.reraise_augmented(exn)
File "apache_beam/runners/common.py", line 281, in apache_beam.runners.common.DoFnRunner.reraise_augmented (apache_beam/runners/common.c:8108)
raise type(exn), args, sys.exc_info()[2]
File "apache_beam/runners/common.py", line 268, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:7660)
self.old_dofn_process(element)
File "apache_beam/runners/common.py", line 173, in apache_beam.runners.common.DoFnRunner.old_dofn_process (apache_beam/runners/common.c:5182)
self._process_outputs(element, self.dofn_process(self.context))
File "apache_beam/runners/common.py", line 152, in apache_beam.runners.common.DoFnRunner.__init__.lambda3 (apache_beam/runners/common.c:3640)
self.dofn_process = lambda context: fn.process(context, *args)
TypeError: process() takes exactly 4 arguments (3 given) [while running 'write_text/Write/WriteImpl/WriteBundles']
最佳答案
您的环境似乎安装了 0.5.1 版(请参阅堆栈跟踪的顶部),但您正在使用 python 存储库的 HEAD 进行构建。
您可以创建一个新的 virtualenv
环境,其中包含正确版本的 SDK。
sdk_location
标志。pip install google-cloud-dataflow
安装 SDK,然后正常运行您的管道。 (最好在使用 virtualenv
的虚拟环境中)请注意,最好使用已发布的版本。
关于python - WriteToText 在 DirectRunner 中工作,但在 DataflowRunner 中因 TypeError 而失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42189550/
鉴于数据集如下 {"slot":"reward","result":1,"rank":1,"isLandscape":false,"p_type":"main","level":1276,"type"
我正在使用 Mac 上的 DirectRunner 测试我的数据流管道,并收到了很多这样的“警告”消息,我是否知道如何摆脱它们,因为太多了,我什至看不到我的调试消息。 谢谢 Apr 05, 2018
我目前有一个在 GCP 上运行的管道。整个过程是使用 pandas 来操作 CSV 并进行一些转换以及来自外部源的侧面输入编写的。 (它使用 bigquery 和存储 API)。事实是,它在 32vC
使用数据流时,完美运行的管道会引发错误。所以我尝试了一个简单的管道并得到了同样的错误。 相同的管道将在 DirectRunner 上正常运行。执行环境是 Google-datalab。 请让我知道我的
我正在使用 apache beam DirectRunner 从 kafka 主题加载数据。我的代码如下: conf={'bootstrap.servers':'localhost:9092'} wi
以下代码: def get_pipeline(workers): pipeline_options = PipelineOptions(['--direct_num_workers', str
当我声明具有两个源(1 个 gcs 和 1 个 pubsub)的管道时,我收到错误,但仅限于 Beam DirectRunner。与 Google dataflow runner 配合使用,效果很好。
当我声明具有两个源(1 个 gcs 和 1 个 pubsub)的管道时,我收到错误,但仅限于 Beam DirectRunner。与 Google dataflow runner 配合使用,效果很好。
我很好奇这里是否还有其他人遇到过类似的 python apache beam 数据流运行器问题,如下所述。 (我现在还不能运送到 CloudRunner) 正在执行的查询返回不到 1800 万行。如果
我可以使用 DirectRunner 运行这段代码,它运行良好。使用 DataflowRunner 它会崩溃: TypeError: process() takes exactly 4 argumen
我目前正在开发一个数据流管道,该管道使用 Apache Beam Java SDK 2.8.0 从 Pub/Sub 读取流数据。该管道只是来自 Google 的 PubsubToText.java 模
长话短说 我们有一个默认的 VPC。试图运行数据流作业。初始步骤(读取文件)设法处理 1/2 个步骤。获取 JOB_MESSAGE_ERROR: SDK harness sdk-0-0 disconn
长话短说 我们有一个默认的 VPC。试图运行数据流作业。初始步骤(读取文件)设法处理 1/2 个步骤。获取 JOB_MESSAGE_ERROR: SDK harness sdk-0-0 disconn
我是一名优秀的程序员,十分优秀!