gpt4 book ai didi

python - 从 Apache Beam 中的多个文件夹读取文件并将输出映射到文件名

转载 作者:太空狗 更新时间:2023-10-29 21:09:16 24 4
gpt4 key购买 nike

致力于从多个文件夹中读取文件,然后使用 python sdk 和数据流运行器将文件名(文件内容,文件名)输出到 apache beam 中的 bigquery。

最初以为我可以为每个文件创建一个 pcollection,然后将文件内容与文件名映射。

def read_documents(pipeline):
"""Read the documents at the provided uris and returns (uri, line) pairs."""
pcolls = []
count = 0
with open(TESTIN) as uris:
for uri in uris:
#print str(uri).strip("[]/'")
pcolls.append(
pipeline
| 'Read: uri' + str(uri) >>ReadFromText(str(uri).strip("[]/'"), compression_type = 'gzip')
| 'WithKey: uri' + str(uri) >> beam.Map(lambda v, uri: (v, str(uri).strip("[]")), uri)
)
return pcolls | 'FlattenReadPColls' >> beam.Flatten()

这工作正常但速度很慢并且在大约 10000 个文件后无法在数据流云上工作。如果超过 10000 个左右的文件,它将遭受管道损坏。

目前正在尝试重载 Text.io 中的 ReadAllFromText 函数。 Text.io 旨在从一组文件名或模式中快速读取大量文件。如果从 Google 云存储读取并且文件具有内容编码,则此模块中存在错误。 Google 云存储会自动压缩文件并对其进行转码,但出于某种原因,ReadAllFromText 无法使用它。您必须更改文件的元数据以删除内容编码并将 ReadAllFromText 上的压缩类型设置为 gzip。我包括这个问题 url,以防其他人遇到 ReadAllFromText 问题 https://issues.apache.org/jira/browse/BEAM-1874

我现在的代码是这样的

class ReadFromGs(ReadAllFromText):

def __init__(self):
super(ReadFromGs, self).__init__(compression_type="gzip")

def expand(self, pvalue):
files = self._read_all_files
return (
pvalue
| 'ReadAllFiles' >> files #self._read_all_files
| 'Map values' >> beam.Map( lambda v: (v, filename)) # filename is a placeholder for the input filename that im trying to figure out how to include in the output.
)

ReadAllFromText 包含在 Text.io 中,从 filebasedsource.py 调用 ReadAllText 并继承自 PTransform。

我相信我只是遗漏了一些简单的遗漏。

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filebasedsource.py

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/textio.py

最佳答案

如您所见,ReadFromText 目前不支持动态文件名,您绝对不想为每个 URL 创建单独的步骤。从您最初的句子中,我了解到您希望将文件名和文件内容作为一个项目取出。这意味着您将不需要或受益于文件部分的任何流式传输。您可以简单地阅读文件内容。像这样的东西:

import apache_beam as beam
from apache_beam.io.filesystems import FileSystems


def read_all_from_url(url):
with FileSystems.open(url) as f:
return f.read()


def read_from_urls(pipeline, urls):
return (
pipeline
| beam.Create(urls)
| 'Read File' >> beam.Map(lambda url: (
url,
read_all_from_url(url)
))
)

如果您认为元数据有问题,您可以对其进行自定义。输出将是一个元组(url文件内容)。如果您的文件内容非常大,您可能需要根据您的用例采用略有不同的方法。

关于python - 从 Apache Beam 中的多个文件夹读取文件并将输出映射到文件名,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51433664/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com