gpt4 book ai didi

scala - Spark /斯卡拉: fill nan with last good observation

转载 作者:行者123 更新时间:2023-12-04 03:16:25 26 4
gpt4 key购买 nike

我正在使用 spark 2.0.1 并想用列中的最后一个已知值填充 nan 值。

我能找到的关于 spark 的唯一引用 Spark / Scala: forward fill with last observationFill in null with previously known good value with pyspark似乎使用RDD。

我宁愿留在数据框/数据集世界中,并可能处理多个 nan 值。
这可能吗?

我的假设是数据(最初从例如 CSV 文件加载是按时间排序的,并且该顺序保留在分布式设置中,例如通过关闭/最后一个已知值填充是正确的。对于大多数情况,填充前一个值就足够了记录连续没有 2 个或更多 nan 记录。这实际上成立吗?
关键是一个

myDf.sort("foo").show

会破坏任何订单,例如全部 null值(value)将是第一位的。

一个小例子:
import java.sql.{ Date, Timestamp }
case class FooBar(foo:Date, bar:String)
val myDf = Seq(("2016-01-01","first"),("2016-01-02","second"),("2016-wrongFormat","noValidFormat"), ("2016-01-04","lastAssumingSameDate"))
.toDF("foo","bar")
.withColumn("foo", 'foo.cast("Date"))
.as[FooBar]

结果是
+----------+--------------------+
| foo| bar|
+----------+--------------------+
|2016-01-01| first|
|2016-01-02| second|
| null| noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+

我想用最后一个已知的值来修复该值。我怎样才能做到这一点?
+----------+--------------------+
| foo| bar|
+----------+--------------------+
|2016-01-01| first|
|2016-01-02| second|
|2016-01-02| noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+

编辑

在我的情况下,填充上一行的值就足够了,因为只有非常有限的错误值。

编辑2

我尝试添加索引列
val myDf = Seq(("2016-01-01", "first"), ("2016-01-02", "second"), ("2016-wrongFormat", "noValidFormat"), ("2016-01-04", "lastAssumingSameDate"))
.toDF("foo", "bar")
.withColumn("foo", 'foo.cast("Date"))
.as[FooBar]
.withColumn("rowId", monotonically_increasing_id())

然后用最后一个值填充。
myDf.withColumn("fooLag", lag('foo, 1) over Window.orderBy('rowId)).show

但这会显示以下警告:
没有为窗口操作定义分区!将所有数据移动到单个分区,这会导致严重的性能下降。我如何引入有意义的分区?
+----------+--------------------+-----+----------+
| foo| bar|rowId| fooLag|
+----------+--------------------+-----+----------+
|2016-01-01| first| 0| null|
|2016-01-02| second| 1|2016-01-01|
| null| noValidFormat| 2|2016-01-02|
|2016-01-04|lastAssumingSameDate| 3| null|
+----------+--------------------+-----+----------+

最佳答案

//用最后一个未知的空值填充空字段
我试过了,这确实有效!!

val dftxt1 = spark.read.option("header","true").option("sep","\t").csv("/sdata/ph/com/r/ph_com_r_ita_javelin/inbound/abc.txt").toDF("line_name", "merge_key", "line_id")
dftxt2.select("line_name","merge_key","line_id").write.mode("overwrite").insertInto("dbname.tablename")

val df = spark.sql("select * from dbname.tablename")

val Df1 = df.withColumn("rowId", monotonically_increasing_id())

import org.apache.spark.sql.expressions.Window

val partitionWindow = Window.orderBy("rowId")

val Df2 = Df1.withColumn("line_id", last("line_id", true) over (partitionWindow))

Df2.show

关于scala - Spark /斯卡拉: fill nan with last good observation,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40592207/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com