gpt4 book ai didi

python - 将具有不同列顺序和字段名的多个CSV文件读入Spark

转载 作者:太空狗 更新时间:2023-10-30 01:36:34 36 4
gpt4 key购买 nike

我有一个csv文件目录,我想读入spark数据框。我知道当文件具有相同的字段名和列顺序时,这是直接的:

raw_transactions_df = spark.read.csv("file_*.csv", inferSchema=True, header=True)

但是,由于我的文件来自不同的系统:
它们没有相同的列顺序。
在某些文件中,其中一个字段名的拼写不同
在这种情况下,是否有一种干净的方法以可重复的方式将所有csv文件从一个目录加载到一个公共spark数据帧中?
我的第一次尝试如下:
import csv

final_headers = ['col1', 'col2', 'col3', 'col4', 'col5', 'col6', 'col7']

merged_rows = []
for f in trans_files:
with open(f, 'r') as csv_in:
csvreader = csv.reader(csv_in, delimiter=',')
headers = dict((h, i) for i, h in enumerate(next(csvreader)))
headers = { x.replace('col7_id', 'col7'): headers[x] for x in headers.keys() }

for row in csvreader:
merged_rows.append(tuple(row[headers[x]] for x in final_headers))

merged_df = spark.createDataFrame(merged_rows, final_headers)

这在一定程度上起作用,但会导致df,其中所有列都是stringtype。如果我试图将定义的架构传递给spark.createdataframe,则会出现异常:
TypeError: DecimalType(16,0) can not accept object '83215400105' in type <class 'str'>

编辑:我知道我可以根据需要和快乐的日子显式地转换我的列,但是如果我们遇到另一个列顺序或列名称拼写错误,所有这些看起来都是非常手动的,并且容易被破坏。
那么-有没有一个好的策略,我可以用它把csv从一个目录加载到上面我已经指定的挑战的spark中?

最佳答案

是的,你可以
如果行的顺序不同,但您在标题中有行名(我假设您使用的是header=True,请阅读它们)。一旦您处于数据帧上下文中,底层顺序就不重要了,因为您无论如何都是按名称寻址行的
在行名不同的特定情况下,您将得到这些行的col7null和col7_id填充,这可以通过sql在post中修复
如果由于某种原因无法读取整个目录,只需遍历所有文件,在循环中创建一个df并使用df.union-函数,那么您甚至可以在那里处理col7_id

关于python - 将具有不同列顺序和字段名的多个CSV文件读入Spark,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46591207/

36 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com