gpt4 book ai didi

scala - 带当前行条件的 Spark 窗口函数

转载 作者:行者123 更新时间:2023-12-03 23:08:51 25 4
gpt4 key购买 nike

我正在尝试计算给定的 order_id过去 365 天内有多少订单已付款。这不是问题:我使用 window function .

对我来说比较棘手的地方是:我不想在 payment_date 的这个时间窗口计算订单。是在 order_date 之后当前order_id .

目前,我有这样的事情:

val window: WindowSpec = Window
.partitionBy("customer_id")
.orderBy("order_date")
.rangeBetween(-365*days, -1)


df.withColumn("paid_order_count", count("*") over window)

这将计算客户在当前订单之前的最后 365 天内的所有订单。

我现在如何为采用 order_date 的计数合并一个条件考虑当前订单?

例子:
+---------+-----------+-------------+------------+
|order_id |order_date |payment_date |customer_id |
+---------+-----------+-------------+------------+
|1 |2017-01-01 |2017-01-10 |A |
|2 |2017-02-01 |2017-02-10 |A |
|3 |2017-02-02 |2017-02-20 |A |

结果表应如下所示:
+---------+-----------+-------------+------------+-----------------+
|order_id |order_date |payment_date |customer_id |paid_order_count |
+---------+-----------+-------------+------------+-----------------+
|1 |2017-01-01 |2017-01-10 |A |0 |
|2 |2017-02-01 |2017-02-10 |A |1 |
|3 |2017-02-02 |2017-02-20 |A |1 |

对于 order_id = 3 paid_order_count不应该是 2但是 1order_id = 2order_id = 3 之后支付被放置。

我希望我能很好地解释我的问题,并期待您的想法!

最佳答案

很好的问题!!!
一些评论,使用 范围介于 之间创建一个基于其中的行数而不是值的固定框架,因此在两种情况下会出现问题:

  • 客户并非每天都有订单,因此 365 行窗口可能包含一年前带有 order_date 的行
  • 如果客户每天有一个以上的订单,一年的保修期就会困惑
  • 1 和 2 的组合

  • 还有 范围介于 之间不适用于日期和时间戳数据类型。

    为了解决这个问题,可以使用带有列表和 UDF 的窗口函数:
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.expressions.Window

    val df = spark.sparkContext.parallelize(Seq(
    (1, "2017-01-01", "2017-01-10", "A")
    , (2, "2017-02-01", "2017-02-10", "A")
    , (3, "2017-02-02", "2017-02-20", "A")
    )
    ).toDF("order_id", "order_date", "payment_date", "customer_id")
    .withColumn("order_date_ts", to_timestamp($"order_date", "yyyy-MM-dd").cast("long"))
    .withColumn("payment_date_ts", to_timestamp($"payment_date", "yyyy-MM-dd").cast("long"))

    // df.printSchema()
    // df.show(false)

    val window = Window.partitionBy("customer_id").orderBy("order_date_ts").rangeBetween(Window.unboundedPreceding, -1)

    val count_filtered_dates = udf( (days: Int, top: Long, array: Seq[Long]) => {
    val bottom = top - (days * 60 * 60 * 24).toLong // in spark timestamps are in secconds, calculating the date days ago
    array.count(v => v >= bottom && v < top)
    }
    )

    val res = df.withColumn("paid_orders", collect_list("payment_date_ts") over window)
    .withColumn("paid_order_count", count_filtered_dates(lit(365), $"order_date_ts", $"paid_orders"))

    res.show(false)

    输出:
    +--------+----------+------------+-----------+-------------+---------------+------------------------+----------------+
    |order_id|order_date|payment_date|customer_id|order_date_ts|payment_date_ts|paid_orders |paid_order_count|
    +--------+----------+------------+-----------+-------------+---------------+------------------------+----------------+
    |1 |2017-01-01|2017-01-10 |A |1483228800 |1484006400 |[] |0 |
    |2 |2017-02-01|2017-02-10 |A |1485907200 |1486684800 |[1484006400] |1 |
    |3 |2017-02-02|2017-02-20 |A |1485993600 |1487548800 |[1484006400, 1486684800]|1 |
    +--------+----------+------------+-----------+-------------+---------------+------------------------+----------------+

    以秒为单位将日期转换为 Spark 时间戳可以使列表的内存效率更高。

    这是最容易实现的代码,但不是最优化的,因为列表会占用一些内存,自定义 UDAF 是最好的,但需要更多的编码,以后可能会做。如果每个客户有数千个订单,这仍然有效。

    关于scala - 带当前行条件的 Spark 窗口函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52895445/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com