gpt4 book ai didi

pyspark - 读取文件并将其附加到 spark 数据框中

转载 作者:行者123 更新时间:2023-12-02 00:50:49 26 4
gpt4 key购买 nike

我创建了一个空数据框并开始通过读取每个文件向其中添加内容。但是其中一个文件的列数比前一个多。如何为所有其他文件只选择第一个文件中的列?

from pyspark.sql import SparkSession

from pyspark.sql import SQLContext

from pyspark.sql.types import StructType
import os, glob
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
fpath=''
schema = StructType([])
sc = spark.sparkContext
df_spark=spark.createDataFrame(sc.emptyRDD(), schema)
files=glob.glob(fpath +'*.sas7bdat')
for i,f in enumerate(files):
if i == 0:
df=spark.read.format('com.github.saurfang.sas.spark').load(f)
df_spark= df
else:
df=spark.read.format('com.github.saurfang.sas.spark').load(f)
df_spark=df_spark.union(df)

最佳答案

您可以在创建数据框时提供自己的架构。例如,我有两个文件 emp1.csv & emp2.​​csv 具有不同的架构。

id,empname,empsalary
1,Vikrant,55550

id,empname,empsalary,age,country
2,Raghav,10000,32,India

schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("salary", IntegerType(), True)])

file_path="file:///home/vikct001/user/vikrant/inputfiles/testfiles/emp*.csv"
df=spark.read.format("com.databricks.spark.csv").option("header", "true").schema(schema).load(file_path)

指定架构不仅可以解决数据类型和格式问题,而且对于提高性能也是必要的。

如果您需要删除格式错误的记录,还有其他选项,但这也会删除包含空值或不符合提供的架构的记录。它可能会跳过那些也有多个分隔符和垃圾字符或空文件的记录。

.option("mode", "DROPMALFORMED")

FAILFAST 模式将在发现格式错误的记录时抛出异常。

.option("mode", "FAILFAST")

您还可以使用 map 函数来选择您选择的元素并在构建数据框时排除其他元素。

df=spark.read.format('com.databricks.spark.csv').option("header", "true").load(file_path).rdd.map(lambda x :(x[0],x[1],x[2])).toDF(["id","name","salary"])

在这两种情况下,您都需要将 header 设置为“true”,否则它将包含您的 csv header 作为数据框的第一条记录。

关于pyspark - 读取文件并将其附加到 spark 数据框中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57824016/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com