gpt4 book ai didi

scala - 当第一行是模式时,如何从 Spark 中的 csv(使用 scala)创建数据框?

转载 作者:行者123 更新时间:2023-12-01 12:33:24 24 4
gpt4 key购买 nike

我是 Spark 的新手,我正在使用 scala 进行编码。我想从 HDFS 或 S3 读取文件并将其转换为 Spark 数据帧。 Csv 文件的第一行是模式。但是如何创建具有未知列的架构的数据框?我使用以下代码为已知模式创建数据框。

def loadData(path:String): DataFrame = {

val rdd = sc.textFile(path);
val firstLine = rdd.first();
val schema = StructType(firstLine.split(',').map(fieldName=>StructField(fieldName,StringType,true)));

val noHeader = rdd.mapPartitionsWithIndex(
(i, iterator) =>
if (i == 0 && iterator.hasNext) {
iterator.next
iterator
} else iterator)


val rowRDD = noHeader.map(_.split(",")).map(p => Row(p(0), p(1), p(2), p(3), p(4),p(5)))

val dataFrame = sqlContext.createDataFrame(rowRDD, schema);
return dataFrame;

最佳答案

亲爱的哈马德,你可以试试下面的代码

val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test"))
val sqlcon = new SQLContext(sc)
//comma separated list of columnName:type

def main(args:Array[String]){
var schemaString ="Id:int,FirstName:text,LastName:text,Email:string,Country:text"
val schema =
StructType(
schemaString.split(",").map(fieldName => StructField(fieldName.split(":")(0),
getFieldTypeInSchema(fieldName.split(":")(1)), true)))
val rdd=sc.textFile("/users.csv")
val noHeader = rdd.mapPartitionsWithIndex(
(i, iterator) =>
if (i == 0 && iterator.hasNext) {
iterator.next
iterator
} else iterator)
val rowRDDx =noHeader.map(p => {
var list: collection.mutable.Seq[Any] = collection.mutable.Seq.empty[Any]
var index = 0
var tokens = p.split(",")
tokens.foreach(value => {
var valType = schema.fields(index).dataType
var returnVal: Any = null
valType match {
case IntegerType => returnVal = value.toString.toInt
case DoubleType => returnVal = value.toString.toDouble
case LongType => returnVal = value.toString.toLong
case FloatType => returnVal = value.toString.toFloat
case ByteType => returnVal = value.toString.toByte
case StringType => returnVal = value.toString
case TimestampType => returnVal = value.toString
}
list = list :+ returnVal
index += 1
})
Row.fromSeq(list)
})
val df = sqlcon.applySchema(rowRDDx, schema)
}
def getFieldTypeInSchema(ftype: String): DataType = {

ftype match {
case "int" => return IntegerType
case "double" => return DoubleType
case "long" => return LongType
case "float" => return FloatType
case "byte" => return ByteType
case "string" => return StringType
case "date" => return TimestampType
case "timestamp" => return StringType
case "uuid" => return StringType
case "decimal" => return DoubleType
case "boolean" => BooleanType
case "counter" => IntegerType
case "bigint" => IntegerType
case "text" => return StringType
case "ascii" => return StringType
case "varchar" => return StringType
case "varint" => return IntegerType
case default => return StringType
}
}

希望对你有所帮助。 :)

关于scala - 当第一行是模式时,如何从 Spark 中的 csv(使用 scala)创建数据框?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31397845/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com