gpt4 book ai didi

python - 如何迭代谷歌云存储中的所有文件以用作数据流输入?

转载 作者:行者123 更新时间:2023-12-02 09:32:21 25 4
gpt4 key购买 nike

用例

我想解析云存储中的多个文件并将结果插入 BigQuery 表中。

选择一个特定文件来读取效果很好。然而,当我使用 * glob 模式切换一个文件以包含所有文件时,我遇到了困难。

我正在执行这样的工作:

python batch.py --project foobar --job_name foobar-metrics --runner DataflowRunner --staging_location gs://foobar-staging/dataflow --temp_location gs://foobar-staging/dataflow_temp --output foobar.test

这是第一个 Dataflow 实验,我不确定如何调试它或这样的管道有哪些最佳实践。

预期结果

我希望作业上传到数据流运行程序,并且收集文件列表并迭代每个文件将在运行时在云中发生。我希望能够以与读取一个文件时相同的方式传递所有文件的内容。

实际结果

作业在尝试将其提交到 Cloud Dataflow 运行程序时已被阻止。

batch.py​​ 的内容

"""A metric sink workflow."""

from __future__ import absolute_import

import json
import argparse
import logging

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import SetupOptions
from apache_beam.utils.pipeline_options import GoogleCloudOptions

class ExtractDatapointsFn(beam.DoFn):
"""
Parse json documents and extract the metrics datapoints.
"""
def __init__(self):
super(ExtractDatapointsFn, self).__init__()
self.total_invalid = Metrics.counter(self.__class__, 'total_invalid')

def process(self, element):
"""
Process json that contains metrics of each element.

Args:
element: the element being processed.

Returns:
unmarshaled json for each metric point.
"""
try:
# Catch parsing errors as well as our custom key check.
document = json.loads(element)
if not "DataPoints" in document:
raise ValueError("missing DataPoints")
except ValueError:
self.total_invalid.inc(1)
return

for point in document["DataPoints"]:
yield point

def run(argv=None):
"""
Main entry point; defines and runs the pipeline.
"""
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='gs://foobar-sink/*',
help='Input file to process.')
parser.add_argument('--output',
required=True,
help=(
'Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
'or DATASET.TABLE.'))
known_args, pipeline_args = parser.parse_known_args(argv)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(GoogleCloudOptions)
pipe = beam.Pipeline(options=pipeline_options)

# Read the json data and extract the datapoints.
documents = pipe | 'read' >> ReadFromText(known_args.input)
metrics = documents | 'extract datapoints' >> beam.ParDo(ExtractDatapointsFn())

# BigQuery sink table.
_ = metrics | 'write bq' >> beam.io.Write(
beam.io.BigQuerySink(
known_args.output,
schema='Path:STRING, Value:FLOAT, Timestamp:TIMESTAMP',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))

# Actually run the pipeline (all operations above are deferred).
result = pipe.run()
result.wait_until_finish()

total_invalid_filter = MetricsFilter().with_name('total_invalid')
query_result = result.metrics().query(total_invalid_filter)
if query_result['counters']:
total_invalid_counter = query_result['counters'][0]
logging.info('number of invalid documents: %d', total_invalid_counter.committed)
else:
logging.info('no invalid documents were found')

if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()

最佳答案

我们在提交作业时对源进行大小估计,以便数据流服务可以在初始化作业时使用该信息(例如,确定初始的工作人员数量)。为了估计 glob 的大小,我们需要扩展 glob。如果 glob 扩展到超过 100k 个文件,这可能需要一些时间(我相信 GCS 需要几分钟)。我们将在此处研究改善用户体验的方法。

关于python - 如何迭代谷歌云存储中的所有文件以用作数据流输入?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43095445/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com