gpt4 book ai didi

scala - 在 spark 中对多个 DataFrame 执行连接

转载 作者:行者123 更新时间:2023-12-04 14:40:57 25 4
gpt4 key购买 nike

我有 3 个由 3 个不同进程生成的数据帧。
每个数据框都有同名的列。
我的数据框看起来像这样

id   val1    val2       val3    val4
1 null null null null
2 A2 A21 A31 A41

id val1 val2 val3 val4
1 B1 B21 B31 B41
2 null null null null

id val1 val2 val3 val4
1 C1 C2 C3 C4
2 C11 C12 C13 C14

在这 3 个数据帧中,我想创建两个数据帧(最终和合并)。
最后,偏好的顺序 -
数据框 1 > 数据框 2 > 数据框 3

如果结果在数据帧 1(val1 != null) 中,我将在最终数据帧中存储该行。

我的最终结果应该是:
id  finalVal1    finalVal2   finalVal3   finalVal4 
1 B1 B21 B31 B41
2 A2 A21 A31 A41

Consolidated Dataframe 将存储所有 3 个的结果。

我怎样才能有效地做到这一点?

最佳答案

如果我理解正确,对于每一行,您要找出第一个非空值,首先查看第一个表,然后是第二个表,然后是第三个表。

您只需要根据 id 加入这三个表。然后使用 coalesce获取第一个非空元素的函数

import org.apache.spark.sql.functions._

val df1 = sc.parallelize(Seq(
(1,null,null,null,null),
(2,"A2","A21","A31", "A41"))
).toDF("id", "val1", "val2", "val3", "val4")

val df2 = sc.parallelize(Seq(
(1,"B1","B21","B31", "B41"),
(2,null,null,null,null))
).toDF("id", "val1", "val2", "val3", "val4")

val df3 = sc.parallelize(Seq(
(1,"C1","C2","C3","C4"),
(2,"C11","C12","C13", "C14"))
).toDF("id", "val1", "val2", "val3", "val4")

val consolidated = df1.join(df2, "id").join(df3, "id").select(
df1("id"),
coalesce(df1("val1"), df2("val1"), df3("val1")).as("finalVal1"),
coalesce(df1("val2"), df2("val2"), df3("val2")).as("finalVal2"),
coalesce(df1("val3"), df2("val3"), df3("val3")).as("finalVal3"),
coalesce(df1("val4"), df2("val4"), df3("val4")).as("finalVal4")
)

这为您提供了预期的输出
+---+----+----+----+----+
| id|val1|val2|val3|val4|
+---+----+----+----+----+
| 1| B1| B21| B31| B41|
| 2| A2| A21| A31| A41|
+---+----+----+----+----+

关于scala - 在 spark 中对多个 DataFrame 执行连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39976328/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com