gpt4 book ai didi

csv - 将 csv 文件读取为 spark 数据帧

转载 作者:行者123 更新时间:2023-12-01 11:07:21 28 4
gpt4 key购买 nike

我有一个 CSV 文件和一个必须通过 Spark(2.0.0 和 Scala 2.11.8)作为数据帧读取的 header 。

示例 csv 数据:

Item,No. of items,Place
abc,5,xxx
def,6,yyy
ghi,7,zzz
.........

当我尝试将 spark 中的此 csv 数据作为数据框读取时,我遇到了问题,因为标题包含具有特殊字符“.”的列(项目编号)

我尝试读取 csv 数据的代码是:

val spark = SparkSession.builder().appName("SparkExample")
import spark.implicits._
val df = spark.read.option("header", "true").csv("file:///INPUT_FILENAME")

我遇到的错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Unable to resolve No. of items given [Item,No. of items,Place];

如果我从 header 中删除 ".",我将不会收到任何错误。甚至尝试转义字符,但它甚至从数据中转义了所有 "." 字符。

有什么方法可以使用 spark 代码从 CSV header 中转义特殊字符 "." 吗?

最佳答案

@Pooja Nayak,不确定是否已解决;为了社区的利益回答这个问题。

sc: SparkContext
spark: SparkSession
sqlContext: SQLContext

// Read the raw file from localFS as-is.
val rdd_raw = sc.textFile("file:///home/xxxx/sample.csv")

// Drop the first line in first partition because it is the header.
val rdd = rdd_raw.mapPartitionsWithIndex{(idx,iter) =>
if(idx == 0) iter.drop(1) else iter
}

// A function to create schema dynamically.
def schemaCreator(header: String): StructType = {
StructType(header
.split(",")
.map(field => StructField(field.trim, StringType, true))
)
}

// Create the schema for the csv that was read and store it.
val csvSchema: StructType = schemaCreator(rdd_raw.first)

// As the input is CSV, split it at "," and trim away the whitespaces.
val rdd_curated = rdd.map(x => x.split(",").map(y => y.trim)).map(xy => Row(xy:_*))

// Create the DF from the RDD.
val df = sqlContext.createDataFrame(rdd_curated, csvSchema)

导入必要的

import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark._

关于csv - 将 csv 文件读取为 spark 数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46072751/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com