作者热门文章
- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我有 620 个 csv 文件,它们有不同的列和数据。例如:
//file1.csv
word, count1
w1, 100
w2, 200
//file2.csv
word, count2
w1, 12
w5, 22
//Similarly fileN.csv
word, countN
w7, 17
w2, 28
我的预期输出
//result.csv
word, count1, count2, countN
w1, 100, 12, null
w2, 200 , null, 28
w5, null, 22, null
w7, null, null, 17
我能够在 Scala 中为这样的两个文件执行此操作,其中 df1
是 file1.csv
而 df2
是 file2 .csv
:
df1.join(df2, Seq("word"),"fullouter").show()
我需要任何解决方案,无论是在 Scala 还是 Linux 命令中都可以。
最佳答案
使用 Spark,您可以将所有文件作为 Dataframe
读取并将其存储在 List[Dataframe]
中。之后,您可以在该 List
上应用 reduce
以将所有数据帧连接在一起。以下是使用三个数据帧的代码,但您可以对所有文件进行扩展和使用。
//create all three dummy DFs
val df1 = sc.parallelize(Seq(("w1", 100), ("w2", 200))).toDF("word", "count1")
val df2 = sc.parallelize(Seq(("w1", 12), ("w5", 22))).toDF("word", "count2")
val df3 = sc.parallelize(Seq(("w7", 17), ("w2", 28))).toDF("word", "count3")
//store all DFs in a list
val dfList: List[DataFrame] = List(df1, df2, df3)
//apply reduce function to join them together
val joinedDF = dfList.reduce((a, b) => a.join(b, Seq("word"), "fullouter"))
joinedDF.show()
//output
//+----+------+------+------+
//|word|count1|count2|count3|
//+----+------+------+------+
//| w1| 100| 12| null|
//| w2| 200| null| 28|
//| w5| null| 22| null|
//| w7| null| null| 17|
//+----+------+------+------+
//To write to CSV file
joinedDF.write
.option("header", "true")
.csv("PATH_OF_CSV")
这是您读取所有文件并将其存储在列表中的方式
//declare a ListBuffer to store all DFs
import scala.collection.mutable.ListBuffer
val dfList = ListBuffer[DataFrame]()
(1 to 620).foreach(x=>{
val df: DataFrame = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.load(BASE_PATH + s"file$x.csv")
dfList += df
})
关于linux - 如何对多个 csv 文件(Linux 或 Scala)进行完全外部联接?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49293117/
我是一名优秀的程序员,十分优秀!