gpt4 book ai didi

java - Spark : Going reverse in dataframe until a condition met

转载 作者:行者123 更新时间:2023-12-02 10:12:51 25 4
gpt4 key购买 nike

我有一个以下格式的数据框:

+----------+-------+----------+---------+-----------------------+---------+---------+
|rownum |viewid |skillid |parentId |post_timestamp |is_skill |column A |
+----------+-------+----------+---------+-----------------------+---------+---------+
|1 |251 |b |xyz12 |2019-01-31 09:24:02.868|true |abcde |
|2 |251 |b |abc34 |2019-01-31 10:24:02.868|false |453aw |
|3 |251 |b |abc34 |2019-01-31 11:24:02.868|false |abcde |
|4 |94 |a |ghi23 |2019-01-31 02:28:05.107|false |bbbbb |
|5 |94 |a |yui67 |2019-01-31 09:06:57.976|true |nnnn |
|6 |94 |a |qwe12 |2019-01-31 09:24:02.868|false |2n21q |
|7 |94 |a |qwe12 |2019-01-31 10:06:57.976|false |nnnnq |
|8 |94 |a |rty87 |2019-01-31 15:07:57.976|true |1234 |
|9 |94 |a |bnm22 |2019-01-31 16:28:05.107|true |1234 |
|10 |94 |a |bnm22 |2019-01-31 17:28:05.107|true |6789 |
|11 |94 |b |tyu12 |2019-01-31 09:24:02.868|true |6789 |
+----------+-------+----------+---------+-----------------------+---------+---------+

对于一组viewidskillid,如果当前行的 parentId 不等于上一行的 parentId然后找到该组中 SkillId 值为 true 的最新行,并且检查当前行的columnA值是否不等于该行的columnA值。

Column matchedParentId = df.col("parentId").$eq$eq$eq(functions.lag("parentId",1);```

Now how can I go back to the dataframe until skillId is true? I guess going back would be doable as the dataframe is ordered by timestamp.

最佳答案

我使用Scala,但我想出的解决方案是

-使用窗口函数在parent_Id所在行之前查找is_skill = true的最后一行的行号不等于之前的parent_Id-自连接数据框以匹配行

期望的输出是否如下?

+------+------+-------+--------+--------------------+--------+--------+---------------+--------+
|rownum|viewid|skillid|parentId| post_timestamp|is_skill|column A|matchedParentId|isAEqual|
+------+------+-------+--------+--------------------+--------+--------+---------------+--------+
| 1| 251| b| xyz12|20190131 09:24:02...| true| abcde| null| true|
| 2| 251| b| abc34|20190131 10:24:02...| false| 453aw| false| false|
| 3| 251| b| abc34|20190131 11:24:02...| false| abcde| true| true|
| 5| 94| a| yui67|20190131 09:06:57...| true| nnnn| false| true|
| 6| 94| a| qwe12|20190131 09:24:02...| false| 2n21q| false| false|
| 7| 94| a| qwe12|20190131 10:06:57...| false| nnnnq| true| false|
| 8| 94| a| rty87|20190131 15:07:57...| true| 1234| false| true|
| 9| 94| a| bnm22|20190131 16:28:05...| true| 1234| false| true|
| 10| 94| a| bnm22|20190131 17:28:05...| true| 6789| true| true|
| 11| 94| b| tyu12|20190131 09:24:02...| true| 6789| null| true|
+------+------+-------+--------+--------------------+--------+--------+---------------+--------+

这是代码:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions
import spark.implicits._

val df = Seq((1,251 ,"b" ,"xyz12" ,"20190131 09:24:02.868",true ,"abcde"),
(2 ,251 ,"b" ,"abc34" ,"20190131 10:24:02.868", false ,"453aw"),
(3 ,251 ,"b" ,"abc34" ,"20190131 11:24:02.868", false ,"abcde"),
(4 ,94 ,"a" ,"ghi23" ,"20190131 02:28:05.107", false ,"bbbbb"),
(5 ,94 ,"a" ,"yui67" ,"20190131 09:06:57.976", true ,"nnnn"),
(6 ,94 ,"a" ,"qwe12" ,"20190131 09:24:02.868", false ,"2n21q"),
(7 ,94 ,"a" ,"qwe12" ,"20190131 10:06:57.976", false ,"nnnnq"),
(8 ,94 ,"a" ,"rty87" ,"20190131 15:07:57.976", true ,"1234"),
(9 ,94 ,"a" ,"bnm22" ,"20190131 16:28:05.107", true ,"1234"),
(10 ,94 ,"a" ,"bnm22" ,"20190131 17:28:05.107",true ,"6789"),
(11 ,94 ,"b" ,"tyu12" ,"20190131 09:24:02.868",true ,"6789")).
toDF("rownum", "viewid", "skillid", "parentId" , "post_timestamp", "is_skill", "column A")

val w = Window.partitionBy("viewid", "skillid").orderBy("post_timestamp")

val df2 = df.withColumn("matchedParentId", lag($"parentId", 1).over(w).equalTo($"parentId")).
withColumn("rank", rank.over(w)).withColumn("is_skill_int", when($"is_skill", 1).otherwise(0)).
withColumn("test", max($"is_skill_int" * $"rank").over(w))

val df3 = df2.as("df_left").
join(df2.as("df_right"), $"df_left.viewid".equalTo($"df_right.viewid").
and($"df_left.skillid".equalTo($"df_right.skillid")).
and($"df_left.rank".equalTo($"df_right.test"))).
withColumn("isAEqual", $"df_left.column A".equalTo($"df_right.column A")).
select("df_right.rownum", "df_right.viewid", "df_right.skillid", "df_right.parentId", "df_right.post_timestamp", "df_right.is_skill", "df_right.column A", "df_right.matchedParentId", "isAEqual").
orderBy("rownum")

df3.show

关于java - Spark : Going reverse in dataframe until a condition met,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54870111/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com