gpt4 book ai didi

scala - 为 Delta Data 更新 Spark Dataframe 的窗口函数 row_number 列

转载 作者:行者123 更新时间:2023-12-04 12:15:41 26 4
gpt4 key购买 nike

我需要为增量数据更新数据帧的行号列。我已经实现了基本负载的行号,如下所示:

输入数据:

val base = List(List("001", "a", "abc"), List("001", "a", "123"),List("003", "c", "456") ,List("002", "b", "dfr"), List("003", "c", "ytr"))
.map(row => (row(0), row(1), row(2)))

val DS1 = base.toDF("KEY1", "KEY2" ,"VAL")

DS1.show()

+----+----+---+
|KEY1|KEY2|VAL|
+----+----+---+
| 001| a|abc|
| 001| a|123|
| 003| c|456|
| 002| b|dfr|
| 003| c|ytr|
+----+----+---+

现在我使用窗口函数添加了行号,如下所示:
val baseDF =  DS1.select(col("KEY1"), col("KEY2"), col("VAL") ,row_number().over(Window.partitionBy(col("KEY1"), col("KEY2")).orderBy(col("KEY1"), col("KEY2").asc)).alias("Row_Num"))
baseDF.show()

+----+----+---+-------+
|KEY1|KEY2|VAL|Row_Num|
+----+----+---+-------+
|001 |a |abc|1 |
|001 |a |123|2 |
|002 |b |dfr|1 |
|003 |c |456|1 |
|003 |c |ytr|2 |
+----+----+---+-------+

现在增量负载如下:
val delta = List(List("001", "a", "y45") ,List("002", "b", "444"))
.map(row => (row(0), row(1), row(2)))

val DS2 = delta.toDF("KEY1", "KEY2" ,"VAL")
DS2.show()

+----+----+---+
|KEY1|KEY2|VAL|
+----+----+---+
| 001| a|y45|
| 002| b|444|
+----+----+---+

所以预期的更新结果应该是:
baseDF.show()

|KEY1|KEY2|VAL|Row_Num|
+----+----+---+-------+
|001 |a |abc|1 |
|001 |a |123|2 |
| 001| a|y45|3 | -----> Delta record
|002 |b |dfr|1 |
| 002| b|444|2 | -----> Delta record
|003 |c |456|1 |
|003 |c |ytr|2 |
+----+----+---+-------+

使用数据框/数据集实现此解决方案的任何建议?
我们可以使用 spark rdd 的 zipWithIndex 来实现上述解决方案吗?

最佳答案

添加具有更新行号的增量的一种方法是:1) 在 Row_Num 中添加具有大数字的列 DS2 ,2) 将 baseDF 与它并集,以及 3) 计算新的行号,如下所示:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val combinedDF = baseDF.union(
DS2.withColumn("Row_Num", lit(Long.MaxValue))
)

val resultDF = combinedDF.select(
col("KEY1"), col("KEY2"), col("VAL"), row_number().over(
Window.partitionBy(col("KEY1"), col("KEY2")).orderBy(col("Row_Num"))
).alias("New_Row_Num")
)

resultDF.show
+----+----+---+-----------+
|KEY1|KEY2|VAL|New_Row_Num|
+----+----+---+-----------+
| 003| c|456| 1|
| 003| c|ytr| 2|
| 002| b|dfr| 1|
| 002| b|444| 2|
| 001| a|abc| 1|
| 001| a|123| 2|
| 001| a|y45| 3|
+----+----+---+-----------+

关于scala - 为 Delta Data 更新 Spark Dataframe 的窗口函数 row_number 列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46584773/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com