gpt4 book ai didi

python - 使用 Google Cloud DataFlow python sdk 读取一组 xml 文件

转载 作者:太空宇宙 更新时间:2023-11-03 14:42:59 24 4
gpt4 key购买 nike

我正在尝试从 GCS 存储桶中读取 XML 文件的集合并处理它们,其中集合中的每个元素都是代表整个文件的字符串,但我找不到关于如何完成此操作的合适示例,也找不到我可以从主要关于 Java 版本的 Apache Beam 文档中理解它吗?

我目前的管道是这样的:

p = beam.Pipeline(options=PipelineOptions(pipeline_args))

(p
| 'Read from a File' >> beam.io.Read(training_files_folder)
| 'String To BigQuery Row' >> beam.Map(lambda s:
data_ingestion.parse_method(s))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
known_args.output,
schema='title:STRING,text:STRING,id:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish()

我收到的错误信息是:

File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.1\helpers\pydev\pydevd.py", line 1664, in <module>
main()

File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.1\helpers\pydev\pydevd.py", line 1658, in main
globals = debugger.run(setup['file'], None, None, is_module)

File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.1\helpers\pydev\pydevd.py", line 1068, in run
pydev_imports.execfile(file, globals, locals) # execute the script

File "C:/Users/Tomer/PycharmProjects/hyperpartisan/cloud-version/data_ingestion.py", line 135, in <module>
run()

File "C:/Users/Tomer/PycharmProjects/hyperpartisan/cloud-version/data_ingestion.py", line 130, in run
p.run().wait_until_finish()
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 421, in wait_until_finish
self._executor.await_completion()
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\executor.py", line 398, in await_completion
self._executor.await_completion()
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\executor.py", line 444, in await_completion
six.reraise(t, v, tb)
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\executor.py", line 341, in call
finish_state)
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\executor.py", line 366, in attempt_call
side_input_values)
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\transform_evaluator.py", line 109, in get_evaluator
input_committed_bundle, side_inputs)
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\transform_evaluator.py", line 283, in __init__
self._source.pipeline_options = evaluation_context.pipeline_options
AttributeError: 'str' object has no attribute 'pipeline_options'

非常感谢任何帮助。谢谢汤姆

解决了第一个问题:事实证明这不适用于 DirectRunner,将运行器更改为 DataFlowRunner 并将 Read 替换为 ReadFromText 解决了异常:

p = beam.Pipeline(options=PipelineOptions(pipeline_args))

(p
| 'Read from a File' >> beam.io.ReadFromText(training_files_folder)
| 'String To BigQuery Row' >> beam.Map(lambda s:
data_ingestion.parse_method(s))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
known_args.output,
schema='title:STRING,text:STRING,id:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish()

但现在我看到这种方法为我提供了每个文件中的一行作为管道元素,而我希望将整个文件作为每个元素的字符串。不知道该怎么做。我找到了 this post但它是在 java 中,并不确定它是如何与 python 一起工作的,特别是 gcs 版本。

所以看起来 ReadFromText 不适用于我的用例,否则我不知道如何创建文件管道。

解决方法:感谢 Ankur 的帮助,我修改了代码以包括从 MatchResult 对象列表转换所需的步骤,这是 GCSFileSystem 返回的字符串 pCollection,每个代表一个文件。

p = beam.Pipeline(options=PipelineOptions(pipeline_args))
gcs = GCSFileSystem(PipelineOptions(pipeline_args))
gcs_reader = GCSFileReader(gcs)

(p
| 'Read Files' >> beam.Create([m.metadata_list for m in gcs.match([training_files_folder])])
| 'metadata_list to filepath' >> beam.FlatMap(lambda metadata_list: [metadata.path for metadata in metadata_list])
| 'string To BigQuery Row' >> beam.Map(lambda filepath:
data_ingestion.parse_method(gcs_reader.get_string_from_filepath(filepath)))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
known_args.output,
schema='title:STRING,text:STRING,id:STRING',
# Creates the table in BigQuery if it does not yet exist.
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
# Appends data to the BigQuery table
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
p.run().wait_until_finish()

代码使用这个辅助类来读取 gcs 文件:

class GCSFileReader:
"""Helper class to read gcs files"""
def __init__(self, gcs):
self.gcs = gcs

def get_string_from_filepath(self,filepath):
with self.gcs.open(filepath) as reader:
res = reader.read()

return res

最佳答案

ReadFromText 逐行读取给定路径中的文件。你想要的是一个文件列表,然后使用 GcsFileSystem 在 ParDo 中一次读取一个文件 https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/gcsfilesystem.py然后将内容写入 BigQuery。

您也可以引用类似主题的邮件线程https://lists.apache.org/thread.html/85da22a845cef8edd942fcc4906a7b47040a4ae8e10aef4ef00be233@%3Cuser.beam.apache.org%3E

关于python - 使用 Google Cloud DataFlow python sdk 读取一组 xml 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51962956/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com