gpt4 book ai didi

scala - 在不同列的 Spark 中读取 csv 文件

转载 作者:行者123 更新时间:2023-12-04 19:52:04 40 4
gpt4 key购买 nike

我想使用 Scala 将 csv 文件读入 spark 中的数据帧。我的 csv 文件有第一条记录,它有三列,其余记录有 5 列。我的 csv 文件没有列名。我在这里提到是为了理解

Ex:
I'dtype date recordsCount
0 13-02-2015 300
I'dtype date type location. locationCode
1 13-02-2015. R. USA. Us
1. 13-02-2015. T. London. Lon

我的问题是如何将此文件读入数据框,因为第一行和其余行有不同的列。我尝试的解决方案是将文件读取为 rdd 并过滤掉标题记录,然后将剩余的记录转换为数据帧。有没有更好的解决方案?请帮助我

最佳答案

您可以将文件作为原始文本加载,然后使用案例类、Either 实例和模式匹配来分清哪些内容去哪里。下面的例子。

case class Col3(c1: Int, c2: String, c3: Int)
case class Col5(c1: Int, c2: String, c5_col3: String, c4:String, c5: String)
case class Header(value: String)

type C3 = Either[Header, Col3]
type C5 = Either[Header, Col5]

// assume sqlC & sc created

val path = "tmp.tsv"
val rdd = sc.textFile(path)

val eitherRdd: RDD[Either[C3, C5]] = rdd.map{s =>
val spl = s.split("\t")
spl.length match{
case 3 =>
val res = Try{
Col3(spl(0).toInt, spl(1), spl(2).toInt)
}
res match{
case Success(c3) => Left(Right(c3))
case Failure(_) => Left(Left(Header(s)))
}
case 5 =>
val res = Try{
Col5(spl(0).toInt, spl(1), spl(2), spl(3), spl(4))
}
res match{
case Success(c5) => Right(Right(c5))
case Failure(_) => Right(Left(Header(s)))
}
case _ => throw new Exception("fail")
}
}

val rdd3 = eitherRdd.flatMap(_.left.toOption)
val rdd3Header = rdd3.flatMap(_.left.toOption).collect().head
val df3 = sqlC.createDataFrame(rdd3.flatMap(_.right.toOption))

val rdd5 = eitherRdd.flatMap(_.right.toOption)
val rdd5Header = rdd5.flatMap(_.left.toOption).collect().head
val df5 = sqlC.createDataFrame(rdd5.flatMap(_.right.toOption))

df3.show()

df5.show()

使用下面的简单 tsv 进行测试:

col1    col2    col3
0 sfd 300
1 asfd 400
col1 col2 col4 col5 col6
2 pljdsfn R USA Us
3 sad T London Lon

给出输出

+---+----+---+
| c1| c2| c3|
+---+----+---+
| 0| sfd|300|
| 1|asfd|400|
+---+----+---+

+---+-------+-------+------+---+
| c1| c2|c5_col3| c4| c5|
+---+-------+-------+------+---+
| 2|pljdsfn| R| USA| Us|
| 3| sad| T|London|Lon|
+---+-------+-------+------+---+

为简单起见,我忽略了日期格式,只是将这些字段存储为字符串。然而,添加日期解析器以获得正确的列类型并不会复杂得多。

同样,我依靠解析失败来指示标题行。如果解析不会失败,或者必须做出更复杂的决定,您可以替换不同的逻辑。同样,需要更复杂的逻辑来区分相同长度的不同记录类型,或者可能包含(转义的)拆分字符

关于scala - 在不同列的 Spark 中读取 csv 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55343250/

40 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com