gpt4 book ai didi

scala - 将 Spark 2.3.1 与 Scala 结合使用,将任意日期范围列表缩减为不同的不重叠日期范围

转载 作者:行者123 更新时间:2023-12-04 18:56:10 26 4
gpt4 key购买 nike

给定日期范围列表,其中一些重叠:

val df = Seq(
("Mike","2018-09-01","2018-09-10"), // range 1
("Mike","2018-09-05","2018-09-05"), // range 1
("Mike","2018-09-12","2018-09-12"), // range 1
("Mike","2018-09-11","2018-09-11"), // range 1
("Mike","2018-09-25","2018-09-29"), // range 4
("Mike","2018-09-21","2018-09-23"), // range 4
("Mike","2018-09-24","2018-09-24"), // range 4
("Mike","2018-09-14","2018-09-16"), // range 2
("Mike","2018-09-15","2018-09-17"), // range 2
("Mike","2018-09-05","2018-09-05"), // range 1
("Mike","2018-09-19","2018-09-19"), // range 3
("Mike","2018-09-19","2018-09-19"), // range 3
("Mike","2018-08-19","2018-08-20"), // range 5
("Mike","2018-10-01","2018-10-20"), // range 6
("Mike","2018-10-10","2018-10-30") // range 6
).toDF("name", "start", "end")

我想将数据减少到完全封装上述日期且不添加额外日期的最小日期范围集:
+----+----------+----------+                                                    
|name|start |end |
+----+----------+----------+
|Mike|2018-09-01|2018-09-12|
|Mike|2018-09-14|2018-09-17|
|Mike|2018-09-19|2018-09-19|
|Mike|2018-09-21|2018-09-29|
|Mike|2018-08-19|2018-08-20|
|Mike|2018-10-01|2018-10-30|
+----+----------+----------+

编辑:向测试数据添加了三个新条目以说明新的边缘情况。

我不能依赖任何特定顺序的日期。

到目前为止,我在这方面的最佳尝试:
  • 将每个日期范围分解为一组单独的日期
  • 将这些集合合并成一个大集合
  • 将集合排序到列表中,以便按顺序排列日期
  • 将单独的天聚合回天列表。
  • 将每个列表的第一天和最后一天作为新范围。

  • 代码,例如:
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Row
    import scala.collection.immutable.NumericRange
    import java.time.LocalDate

    case class MyRange(start:String, end:String)

    val combineRanges = udf((ranges: Seq[Row]) => {
    ranges.map(r => LocalDate.parse(r(0).toString).toEpochDay to LocalDate.parse(r(1).toString).toEpochDay)
    .map(_.toIndexedSeq).reduce(_ ++ _).distinct.toList.sorted
    .aggregate(List.empty[Vector[Long]])((ranges:List[Vector[Long]], d:Long) => {
    ranges.lastOption.find(_.last + 1 == d) match {
    case Some(r:Vector[Long]) => ranges.dropRight(1) :+ (r :+ d)
    case None => ranges :+ Vector(d)
    }
    }, _ ++ _).map(v => MyRange(LocalDate.ofEpochDay(v.head).toString, LocalDate.ofEpochDay(v.last).toString))
    })

    df.groupBy("name")
    .agg(combineRanges(collect_list(struct($"start", $"end"))) as "ranges")
    .withColumn("ranges", explode($"ranges"))
    .select($"name", $"ranges.start", $"ranges.end")
    .show(false)

    它似乎有效,但它非常丑陋,可能浪费时间和内存。

    我有点希望使用 scala Range 类只是在理论上将日期范围分解为各自的日期,但我有一种感觉,排序操作会强制 scala 的手并使其实际上创建内存中所有日期的列表。

    有没有人有更好的方法来做到这一点?

    最佳答案

    我认为最简单(也是最易读)的方法是将范围分解为单独的日期,然后聚合回间隔。由于天数不能增长太多,我认为爆炸不是这里的瓶颈。我展示了一个“纯 Scala”解决方案,然后在 UDF 中使用该解决方案,该解决方案从 collect_list 获取所有间隔。聚合:

    import java.time.LocalDate
    import java.time.temporal.ChronoUnit

    def enumerateDays(start: LocalDate, end: LocalDate) = {
    Iterator.iterate(start)(d => d.plusDays(1L))
    .takeWhile(d => !d.isAfter(end))
    .toList
    }

    implicit val localDateOrdering: Ordering[LocalDate] = Ordering.by(_.toEpochDay)

    val combineRanges = udf((data: Seq[Row]) => {
    val dateEnumerated =
    data
    .toSet[Row] // use Set to save memory if many spans overlap
    // "explode" date spans into individual days
    .flatMap { case Row(start: String, end: String) => enumerateDays(LocalDate.parse(start), LocalDate.parse(end)) }
    .toVector
    .sorted

    // combine subsequent dates into Vectors
    dateEnumerated.tail
    // combine subsequent dates into Vectors
    .foldLeft(Vector(Vector(dateEnumerated.head)))((agg, curr) => {
    if (ChronoUnit.DAYS.between(agg.last.last, curr) == 1L) {
    agg.init :+ (agg.last :+ curr)
    } else {
    agg :+ Vector(curr)
    }
    })
    // now get min/max of dates per span
    .map(r => (r.min.toString, r.max.toString))
    })

    df.groupBy("name")
    .agg(combineRanges(collect_list(struct($"start", $"end"))) as "ranges")
    .withColumn("ranges", explode($"ranges"))
    .select($"name", $"ranges._1".as("start"), $"ranges._2".as("end"))
    .show(false)


    +----+----------+----------+
    |name|start |end |
    +----+----------+----------+
    |Mike|2018-08-19|2018-08-20|
    |Mike|2018-09-01|2018-09-12|
    |Mike|2018-09-14|2018-09-17|
    |Mike|2018-09-19|2018-09-19|
    |Mike|2018-09-21|2018-09-29|
    |Mike|2018-10-01|2018-10-30|
    +----+----------+----------+

    我认为使用更多逻辑 DataFrame API 也是可行的。我仍然会使用 UDF 爆炸,但随后使用 Window-Functions 和 groupBy 根据 2 个日期之间的天数构建新块。不过我觉得上面的解决方案也可以

    关于scala - 将 Spark 2.3.1 与 Scala 结合使用,将任意日期范围列表缩减为不同的不重叠日期范围,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53030133/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com