gpt4 book ai didi

Python/Apache-光束 : How to Parse Text File To CSV?

转载 作者:行者123 更新时间:2023-12-02 16:47:14 26 4
gpt4 key购买 nike

我对 Beam 还是个新手,但您究竟如何读取 GCS 存储桶中的 CSV 文件?我本质上是使用 Beam 将这些文件转换为 pandas 数据框,然后应用 sklearn 模型来“训练”这些数据。我见过的大多数示例都预先定义了 header ,我希望这个 Beam 管道能够推广到 header 肯定不同的任何文件。有一个名为 beam_utils 的库可以做我想做的事,但后来我遇到了这个错误:AttributeError: module 'apache_beam.io.fileio' has no attribute 'CompressionTypes'

代码示例:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# The error occurs in this import
from beam_utils.sources import CsvFileSource

options = {
'project': 'my-project',
'runner:': 'DirectRunner',
'streaming': False
}

pipeline_options = PipelineOptions(flags=[], **options)

class Printer(beam.DoFn):
def process(self, element):
print(element)

with beam.Pipeline(options=pipeline_options) as p: # Create the Pipeline with the specified options.

data = (p
| 'Read File From GCS' >> beam.io.textio.ReadFromText('gs://my-csv-files')
)

_ = (data | "Print the data" >> beam.ParDo(Printer()))

result = p.run()
result.wait_until_finish()

最佳答案

Apache Beam 模块 fileio 最近被修改为向后不兼容的更改,库 beam_utils 尚未更新。

我浏览了 question @Pablo 和 beam_utils 的源代码(也由 Pablo 编写)建议使用 filesystems 复制行为模块。

以下是使用 pandas 生成 DataFrame 的两个版本的代码。

示例中使用的 csv:

a,b
1,2
3,4
5,6

读取 csv 并创建包含其所有内容的 DataFrame

import apache_beam as beam
import pandas as pd
import csv
import io

def create_dataframe(readable_file):

# Open a channel to read the file from GCS
gcs_file = beam.io.filesystems.FileSystems.open(readable_file)

# Read it as csv, you can also use csv.reader
csv_dict = csv.DictReader(io.TextIOWrapper(gcs_file))

# Create the DataFrame
dataFrame = pd.DataFrame(csv_dict)
print(dataFrame.to_string())

p = beam.Pipeline()
(p | beam.Create(['gs://my-bucket/my-file.csv'])
| beam.FlatMap(create_dataframe)
)

p.run()

结果数据帧

   a  b
0 1 2
1 3 4
2 5 6

读取 csv 并在其他转换中创建 DataFrame

def get_csv_reader(readable_file):

# Open a channel to read the file from GCS
gcs_file = beam.io.filesystems.FileSystems.open(readable_file)

# Return the csv reader
return csv.DictReader(io.TextIOWrapper(gcs_file))

p = beam.Pipeline()
(p | beam.Create(['gs://my-bucket/my-file.csv'])
| beam.FlatMap(get_csv_reader)
| beam.Map(lambda x: pd.DataFrame([x])) # Create the DataFrame from each csv row
| beam.Map(lambda x: print(x.to_string()))
)

结果数据帧

   a  b
0 1 2
a b
0 3 4
a b
0 5 6

关于Python/Apache-光束 : How to Parse Text File To CSV?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59994645/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com