gpt4 book ai didi

python - 在 GCS 上超过 240 万个文件的 Google Dataflow 上运行管道时,Apache Beam 的 FileBasedSource 上出现令人困惑的错误

转载 作者:行者123 更新时间:2023-12-04 09:25:55 25 4
gpt4 key购买 nike

我有一个运行在 Google Dataflow 上的 Apache Beam 管道,它从 GCS 读取 gzip 压缩的 JSON 数据,转换它们,并将它们加载到 Google BigQuery。该管道在一批样本数据上按预期工作,但是当我尝试在整个数据(~2.4M 文件)上运行它时,它有时会引发一个令人困惑的错误,在几次发生后会破坏该过程。
错误是:

Error message from worker: Traceback (most recent call last): File"apache_beam/runners/common.py", line 961, inapache_beam.runners.common.DoFnRunner.process File"apache_beam/runners/common.py", line 553, inapache_beam.runners.common.SimpleInvoker.invoke_process File"apache_beam/runners/common.py", line 1095, inapache_beam.runners.common._OutputProcessor.process_outputs File"/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsource.py",line 380, in process source =list(source.split(float('inf')))[0].source IndexError: list index outof range During handling of the above exception, another exceptionoccurred: Traceback (most recent call last): File"/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py",line 647, in do_work work_executor.execute() File"/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py",line 179, in execute op.start() File"dataflow_worker/shuffle_operations.py", line 63, indataflow_worker.shuffle_operations.GroupedShuffleReadOperation.startFile "dataflow_worker/shuffle_operations.py", line 64, indataflow_worker.shuffle_operations.GroupedShuffleReadOperation.startFile "dataflow_worker/shuffle_operations.py", line 79, indataflow_worker.shuffle_operations.GroupedShuffleReadOperation.startFile "dataflow_worker/shuffle_operations.py", line 80, indataflow_worker.shuffle_operations.GroupedShuffleReadOperation.startFile "dataflow_worker/shuffle_operations.py", line 84, indataflow_worker.shuffle_operations.GroupedShuffleReadOperation.startFile "apache_beam/runners/worker/operations.py", line 332, inapache_beam.runners.worker.operations.Operation.output File"apache_beam/runners/worker/operations.py", line 195, inapache_beam.runners.worker.operations.SingletonConsumerSet.receiveFile "dataflow_worker/shuffle_operations.py", line 261, indataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.processFile "dataflow_worker/shuffle_operations.py", line 268, indataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.processFile "apache_beam/runners/worker/operations.py", line 332, inapache_beam.runners.worker.operations.Operation.output File"apache_beam/runners/worker/operations.py", line 195, inapache_beam.runners.worker.operations.SingletonConsumerSet.receiveFile "apache_beam/runners/worker/operations.py", line 670, inapache_beam.runners.worker.operations.DoOperation.process File"apache_beam/runners/worker/operations.py", line 671, inapache_beam.runners.worker.operations.DoOperation.process File"apache_beam/runners/common.py", line 963, inapache_beam.runners.common.DoFnRunner.process File"apache_beam/runners/common.py", line 1030, inapache_beam.runners.common.DoFnRunner._reraise_augmented File"apache_beam/runners/common.py", line 961, inapache_beam.runners.common.DoFnRunner.process File"apache_beam/runners/common.py", line 553, inapache_beam.runners.common.SimpleInvoker.invoke_process File"apache_beam/runners/common.py", line 1122, inapache_beam.runners.common._OutputProcessor.process_outputs File"apache_beam/runners/worker/operations.py", line 195, inapache_beam.runners.worker.operations.SingletonConsumerSet.receiveFile "apache_beam/runners/worker/operations.py", line 670, inapache_beam.runners.worker.operations.DoOperation.process File"apache_beam/runners/worker/operations.py", line 671, inapache_beam.runners.worker.operations.DoOperation.process File"apache_beam/runners/common.py", line 963, inapache_beam.runners.common.DoFnRunner.process File"apache_beam/runners/common.py", line 1030, inapache_beam.runners.common.DoFnRunner._reraise_augmented File"apache_beam/runners/common.py", line 961, inapache_beam.runners.common.DoFnRunner.process File"apache_beam/runners/common.py", line 553, inapache_beam.runners.common.SimpleInvoker.invoke_process File"apache_beam/runners/common.py", line 1122, inapache_beam.runners.common._OutputProcessor.process_outputs File"apache_beam/runners/worker/operations.py", line 195, inapache_beam.runners.worker.operations.SingletonConsumerSet.receiveFile "apache_beam/runners/worker/operations.py", line 670, inapache_beam.runners.worker.operations.DoOperation.process File"apache_beam/runners/worker/operations.py", line 671, inapache_beam.runners.worker.operations.DoOperation.process File"apache_beam/runners/common.py", line 963, inapache_beam.runners.common.DoFnRunner.process File"apache_beam/runners/common.py", line 1045, inapache_beam.runners.common.DoFnRunner._reraise_augmented File"/usr/local/lib/python3.7/site-packages/future/utils/init.py",line 446, in raise_with_traceback raise exc.with_traceback(traceback)File "apache_beam/runners/common.py", line 961, inapache_beam.runners.common.DoFnRunner.process File"apache_beam/runners/common.py", line 553, inapache_beam.runners.common.SimpleInvoker.invoke_process File"apache_beam/runners/common.py", line 1095, inapache_beam.runners.common._OutputProcessor.process_outputs File"/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsource.py",line 380, in process source =list(source.split(float('inf')))[0].source IndexError: list index outof range [while running 'GetData/ReadAllFiles/ReadRange']


我了解错误涉及的阶段是 GetData :
files = (p
| 'Init' >> beam.Create([files_pattern])
| 'GetData' >> ReadAllFromText())
哪里 p是管道对象和 files_pattern是形式为 gs://{bucket}/{prefix}/*.json.gz 的球体.
这个错误让我感到困惑,因为它没有说明管道本身,并记住它适用于样本批次的事实。
我的过程与 Avoid recomputing size of all Cloud Storage files in Beam Python SDK 中的过程非常相似.我检查了那里提到的资源,但我仍然无法管理我的错误。我在这里缺少什么?我找不到任何有关此错误的具体资源。

最佳答案

我建议使用 Dataflow Shuffle .数据保存在 Dataflow 后端中,而不是将 shuffle 数据保存在 VM 的永久性磁盘中。由于错误消息提到索引超出范围,并且从注释中可以看出,作业使用较少的数据运行,结果是内存或存储空间不足。
此外,请记住,您可以使用 Dataflow Google provided templates .请记住,这些模板使用的是 Java 而不是 Python。模板之一是 GCS Text to BigQuery。

关于python - 在 GCS 上超过 240 万个文件的 Google Dataflow 上运行管道时,Apache Beam 的 FileBasedSource 上出现令人困惑的错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63009565/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com