gpt4 book ai didi

scala - Spark /斯卡拉: forward fill with last observation

转载 作者:行者123 更新时间:2023-12-03 10:29:08 24 4
gpt4 key购买 nike

使用 Spark 1.4.0、Scala 2.10

我一直在想办法用最后一次已知的观察结果来向前填充空值,但我没有看到一种简单的方法。我认为这是一件很常见的事情,但找不到显示如何执行此操作的示例。

我看到了用一个值向前填充 NaN 的函数,或者用一个偏移量填充或移动数据的滞后/领先函数,但没有任何东西可以获取最后一个已知值。

在网上看,我在 R 中看到很多关于同一件事的问答,但在 Spark/Scala 中没有。

我正在考虑在日期范围内映射,从结果中过滤出 NaN 并选择最后一个元素,但我想我对语法感到困惑。

使用 DataFrames 我尝试类似

import org.apache.spark.sql.expressions.Window

val sqlContext = new HiveContext(sc)

var spec = Window.orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")

val df2 = df.withColumn("testForwardFill", (90 to 0).map(i=>lag(df.col("myValue"),i,0).over(spec)).filter(p=>p.getItem.isNotNull).last)

但这并没有让我去任何地方。

过滤器部分不起作用; map 函数返回一个 spark.sql.Columns 序列,但过滤器函数期望返回一个 bool 值,所以我需要从 Column 中获取一个值进行测试,但似乎只有 Column 方法返回一个 Column。

有没有办法在 Spark 上更“简单”地做到这一点?

感谢您的输入

编辑 :

简单示例示例输入:

2015-06-01,33
2015-06-02,
2015-06-03,
2015-06-04,
2015-06-05,22
2015-06-06,
2015-06-07,
...

预期输出:

2015-06-01,33
2015-06-02,33
2015-06-03,33
2015-06-04,33
2015-06-05,22
2015-06-06,22
2015-06-07,22

备注 :
  • 我有很多列,其中很多都有这种缺失的数据模式,但不是在同一日期/时间。如果需要,我将一次转换一列。

  • 编辑 :

    按照@zero323 的回答,我尝试了这种方式:
        import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD

    val rows: RDD[Row] = df.orderBy($"Date").rdd


    def notMissing(row: Row): Boolean = { !row.isNullAt(1) }

    val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows.mapPartitionsWithIndex{
    case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
    .collectAsMap

    val toCarryBd = sc.broadcast(toCarry)

    def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = { if (iter.contains(null)) iter.map(row => Row(toCarryBd.value(i).get(1))) else iter }

    val imputed: RDD[Row] = rows.mapPartitionsWithIndex{ case (i, iter) => fill(i, iter)}

    广播变量最终是一个没有空值的值列表。这是进步,但我仍然无法使映射工作。
    但我什么也没得到,因为索引 i在不映射到原始数据时,它映射到没有空值的子集。

    我在这里缺少什么?

    编辑和解决方案(从@zero323 的答案推断):
    import org.apache.spark.sql.expressions.Window

    val sqlContext = new HiveContext(sc)

    var spec = Window.partitionBy("id").orderBy("Date")
    val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")

    val df2 = df.withColumn("test", coalesce((0 to 90).map(i=>lag(df.col("test"),i,0).over(spec)): _*))

    如果您使用的是 RDD 而不是 DataFrame,请参阅下面的 zero323 答案以获取更多选项。上面的解决方案可能不是最有效的,但对我有用。如果您想优化,请查看 RDD 解决方案。

    最佳答案

    初始答案(单一时间序列假设):

    如果你不能提供PARTITION BY,首先尽量避免使用窗口函数。条款。它将数据移动到单个分区,因此大多数时候它根本不可行。

    您可以做的是填补 RDD 上的空白。使用 mapPartitionsWithIndex .由于您没有提供示例数据或预期输出,请认为这是伪代码而不是真正的 Scala 程序:

  • 首先让我们订购 DataFrame按日期并转换为 RDD
    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD

    val rows: RDD[Row] = df.orderBy($"Date").rdd
  • 接下来让我们找到每个分区的最后一个非空观察
    def notMissing(row: Row): Boolean = ???

    val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows
    .mapPartitionsWithIndex{ case (i, iter) =>
    Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
    .collectAsMap
  • 并转换此 Map广播
    val toCarryBd = sc.broadcast(toCarry)
  • 最后映射分区再次填补空白:
    def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = {
    // If it is the beginning of partition and value is missing
    // extract value to fill from toCarryBd.value
    // Remember to correct for empty / only missing partitions
    // otherwise take last not-null from the current partition
    }

    val imputed: RDD[Row] = rows
    .mapPartitionsWithIndex{ case (i, iter) => fill(i, iter) }
  • 最后转换回 DataFrame

  • 编辑(每组数据的分区/时间序列):

    魔鬼在细节中。如果您的数据毕竟是分区的,那么可以使用 groupBy 解决整个问题。 .假设您只是按 T 类型的“v”列进行分区和 Date是一个整数时间戳:
    def fill(iter: List[Row]): List[Row] = {
    // Just go row by row and fill with last non-empty value
    ???
    }

    val groupedAndSorted = df.rdd
    .groupBy(_.getAs[T]("k"))
    .mapValues(_.toList.sortBy(_.getAs[Int]("Date")))

    val rows: RDD[Row] = groupedAndSorted.mapValues(fill).values.flatMap(identity)

    val dfFilled = sqlContext.createDataFrame(rows, df.schema)

    这样您就可以同时填充所有列。

    Can this be done with DataFrames instead of converting back and forth to RDD?



    这取决于,虽然它不太可能有效。如果最大差距相对较小,您可以执行以下操作:
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.expressions.{WindowSpec, Window}
    import org.apache.spark.sql.Column

    val maxGap: Int = ??? // Maximum gap between observations
    val columnsToFill: List[String] = ??? // List of columns to fill
    val suffix: String = "_" // To disambiguate between original and imputed

    // Take lag 1 to maxGap and coalesce
    def makeCoalesce(w: WindowSpec)(magGap: Int)(suffix: String)(c: String) = {
    // Generate lag values between 1 and maxGap
    val lags = (1 to maxGap).map(lag(col(c), _)over(w))
    // Add current, coalesce and set alias
    coalesce(col(c) +: lags: _*).alias(s"$c$suffix")
    }


    // For each column you want to fill nulls apply makeCoalesce
    val lags: List[Column] = columnsToFill.map(makeCoalesce(w)(maxGap)("_"))


    // Finally select
    val dfImputed = df.select($"*" :: lags: _*)

    可以轻松调整以使用每列不同的最大间隙。

    在最新的 Spark 版本中实现类似结果的更简单方法是使用 lastignoreNulls :
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.expressions.Window

    val w = Window.partitionBy($"k").orderBy($"Date")
    .rowsBetween(Window.unboundedPreceding, -1)

    df.withColumn("value", coalesce($"value", last($"value", true).over(w)))

    虽然有可能掉落 partitionBy条款并在全局范围内应用此方法,对于大型数据集来说,它的成本会高得惊人。

    关于scala - Spark /斯卡拉: forward fill with last observation,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33621319/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com