gpt4 book ai didi

scala - 如何对spark Dataframe进行合并操作?

转载 作者:行者123 更新时间:2023-12-05 09:19:35 24 4
gpt4 key购买 nike

我有 spark dataframe mainDFdeltaDF 都具有匹配的模式。

mainDF的内容如下:

id | name | age
1 | abc | 23
2 | xyz | 34
3 | pqr | 45

deltaDF的内容如下:

id | name | age
1 | lmn | 56
4 | efg | 37

我想根据 id 的值将 deltaDFmainDF 合并。因此,如果我的 id 已经存在于 mainDF 中,则应更新记录,如果 id 不存在,则应添加新记录。所以生成的数据框应该是这样的:

id | name | age
1 | lmn | 56
2 | xyz | 34
3 | pqr | 45
4 | efg | 37

这是我当前的代码,它正在运行:

  val updatedDF = mainDF.as("main").join(deltaDF.as("delta"),$"main.id" === $"delta.id","inner").select($"main.id",$"main.name",$"main.age")
mainDF= mainDF.except(updateDF).unionAll(deltaDF)

但是在这里我需要在选择函数中再次明确提供列表列,这对我来说感觉像是开销。有没有其他更好/更清洁的方法来实现同样的目标?

最佳答案

如果您不想明确提供列列表,您可以映射原始 DF 的列,例如:

.select(mainDF.columns.map(c => $"main.$c" as c): _*)

顺便说一句,您可以在 join 之后没有 union 的情况下执行此操作:您可以使用 outer join 来获取不存在的记录两个 DF,然后使用 coalesce 来“选择”偏好 deltaDF 值的非空值。所以完整的解决方案是这样的:

val updatedDF = mainDF.as("main")
.join(deltaDF.as("delta"), $"main.id" === $"delta.id", "outer")
.select(mainDF.columns.map(c => coalesce($"delta.$c", $"main.$c") as c): _*)

updatedDF.show
// +---+----+---+
// | id|name|age|
// +---+----+---+
// | 1| lmn| 56|
// | 3| pqr| 45|
// | 4| efg| 37|
// | 2| xyz| 34|
// +---+----+---+

关于scala - 如何对spark Dataframe进行合并操作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40169997/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com