gpt4 book ai didi

scala - 使用 Spark 中的动态列将 RDD 数据写入 CSV - Scala

转载 作者:可可西里 更新时间:2023-11-01 14:51:24 25 4
gpt4 key购买 nike

我正在从 HDFS 目录读取多个文件,并且对于每个文件,生成的数据使用以下方式打印:

frequencies.foreach(x => println(x._1 + ": "+x._2))

打印的数据是(对于File1.txt):

'text': 45
'data': 100
'push': 150

其他文件的 key 可能不同,例如 (File2.txt):

'data': 45
'lea': 100
'jmp': 150

key 不一定在所有文件中都相同。我希望将所有文件数据写入以下格式的 .csv 文件:

Filename   text  data  push  lea  jmp
File1.txt 45 100 150 0 0
File2.txt 0 45 0 100 150 ....

有人可以帮我找到解决这个问题的方法吗?

最佳答案

如果你的文件不够大,你可以在没有 Spark 的情况下完成。这是我的示例代码,csv 格式是旧样式,不符合您的预期输出,但您可以轻松调整它。

  import scala.io.Source
import org.apache.hadoop.fs._
val sparkSession = ... // I created it to retrieve hadoop configuration, you can create your own Configuration.
val inputPath = ...
val outputPath = ...

val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
// read all files content to Array of Map[String,String]
val filesContent = fs.listStatus(new Path(inputPath)).filter(_.isFile).map(_.getPath).filter(_.getName.endsWith(".txt"))
.map(s => (s.getName, Source.fromInputStream(fs.open(s)).getLines()
.map(_.split(":").map(_.trim))
.filter(_.length == 2)
.map(p => (p.head, p.last)).toMap))
// create default Map with all possible keys
val listKeys = filesContent.flatMap(_._2.keys).distinct.map(s => (s, "0")).toMap
val csvContent = filesContent.map(s => (s._1, listKeys ++ s._2))
.map(s => (s._1, s._2.values.mkString(",")))
.map(s => s"${s._1},${s._2}")
.mkString("\n")
val csvHeader = ("Filename" +: listKeys.keys.toList).mkString(",")
val csv = csvHeader + "\n" + csvContent

new PrintWriter(fs.create(new Path(outputPath))){
write(csv)
close()
}

关于scala - 使用 Spark 中的动态列将 RDD 数据写入 CSV - Scala,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47518880/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com