gpt4 book ai didi

java - 在 Spark 中实现 SCD 类型 2

转载 作者:行者123 更新时间:2023-12-02 09:08:38 27 4
gpt4 key购买 nike

尝试在 Spark 2.4.4 中实现 SCD Type 2 逻辑。我有两个数据框;一个包含“现有数据”,另一个包含“新传入数据”。

下面给出了输入和预期输出。需要发生的是:

  1. 所有传入行都应附加到现有数据中。

  2. 只有以下 3 行之前处于“Activity ”状态才应变为非 Activity 状态,并填充相应的“endDate”,如下所示:

    pk=1, amount = 20 => 该行应变为“非 Activity ”且“endDate”是下一行(Lead)的“startDate”

    pk=2, amount = 100 => 该行应变为“非 Activity 状态”且“endDate”是下一行(潜在客户)的“startDate”

    pk=3, amount = 750 => 该行应变为“非 Activity 状态”且“endDate”是下一行(潜在客户)的“startDate”

如何在 Spark 中执行此操作?

现有数据:

+---+------+-------------------+-------------------+------+
| pk|amount| startDate| endDate|active|
+---+------+-------------------+-------------------+------+
| 1| 10|2019-01-01 12:00:00|2019-01-20 05:00:00| 0|
| 1| 20|2019-01-20 05:00:00| null| 1|
| 2| 100|2019-01-01 00:00:00| null| 1|
| 3| 75|2019-01-01 06:00:00|2019-01-26 08:00:00| 0|
| 3| 750|2019-01-26 08:00:00| null| 1|
| 10| 40|2019-01-01 00:00:00| null| 1|
+---+------+-------------------+-------------------+------+

新传入数据:

+---+------+-------------------+-------------------+------+
| pk|amount| startDate| endDate|active|
+---+------+-------------------+-------------------+------+
| 1| 50|2019-02-01 07:00:00|2019-02-02 08:00:00| 0|
| 1| 75|2019-02-02 08:00:00| null| 1|
| 2| 200|2019-02-01 05:00:00|2019-02-01 13:00:00| 0|
| 2| 60|2019-02-01 13:00:00|2019-02-01 19:00:00| 0|
| 2| 500|2019-02-01 19:00:00| null| 1|
| 3| 175|2019-02-01 00:00:00| null| 1|
| 4| 50|2019-02-02 12:00:00|2019-02-02 14:00:00| 0|
| 4| 300|2019-02-02 14:00:00| null| 1|
| 5| 500|2019-02-02 00:00:00| null| 1|
+---+------+-------------------+-------------------+------+

预期输出:

+---+------+-------------------+-------------------+------+
| pk|amount| startDate| endDate|active|
+---+------+-------------------+-------------------+------+
| 1| 10|2019-01-01 12:00:00|2019-01-20 05:00:00| 0|
| 1| 20|2019-01-20 05:00:00|2019-02-01 07:00:00| 0|
| 1| 50|2019-02-01 07:00:00|2019-02-02 08:00:00| 0|
| 1| 75|2019-02-02 08:00:00| null| 1|
| 2| 100|2019-01-01 00:00:00|2019-02-01 05:00:00| 0|
| 2| 200|2019-02-01 05:00:00|2019-02-01 13:00:00| 0|
| 2| 60|2019-02-01 13:00:00|2019-02-01 19:00:00| 0|
| 2| 500|2019-02-01 19:00:00| null| 1|
| 3| 75|2019-01-01 06:00:00|2019-01-26 08:00:00| 0|
| 3| 750|2019-01-26 08:00:00|2019-02-01 00:00:00| 1|
| 3| 175|2019-02-01 00:00:00| null| 1|
| 4| 50|2019-02-02 12:00:00|2019-02-02 14:00:00| 0|
| 4| 300|2019-02-02 14:00:00| null| 1|
| 5| 500|2019-02-02 00:00:00| null| 1|
| 10| 40|2019-01-01 00:00:00| null| 1|
+---+------+-------------------+-------------------+------+

最佳答案

您可以首先从新 DataFrame 中为每个组 pk 选择第一个 startDate,然后与旧 DataFrame 合并以更新所需的列。然后,您可以合并所有连接结果和新的 DataFrame。

类似这样的事情:

// get first state by date for each pk group
val w = Window.partitionBy($"pk").orderBy($"startDate")
val updates = df_new.withColumn("rn", row_number.over(w)).filter("rn = 1").select($"pk", $"startDate")

// join with old data and update old values when there is match
val joinOldNew = df_old.join(updates.alias("new"), Seq("pk"), "left")
.withColumn("endDate", when($"endDate".isNull && $"active" === lit(1) && $"new.startDate".isNotNull, $"new.startDate").otherwise($"endDate"))
.withColumn("active", when($"endDate".isNull && $"active" === lit(1) && $"new.startDate".isNotNull, lit(0)).otherwise($"active"))
.drop($"new.startDate")

// union all
val result = joinOldNew.unionAll(df_new)

关于java - 在 Spark 中实现 SCD 类型 2,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59586700/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com