gpt4 book ai didi

scala - Spark SQL 按连续整数序列分组

转载 作者:行者123 更新时间:2023-12-02 20:47:37 25 4
gpt4 key购买 nike

所以我有一个表,我想从中创建事件。我的用户正在观看一个定义为 sub_parts 列表的视频,并为请求的每个 sub_part 下载字节。

例如,Alice正在观看一个5秒15部分的视频,她看了前三部分,然后跳到了第七部分,又播放了两部分,但最终她没有看完视频。

因此,我想使用 Spark SQL 为每个用户重新创建此事件跟踪(很可能是 UDF,但请帮助我,我不明白如何使其工作)

+---+------------+-------------+-------------+
| | Name | Video_part | Bytes Dl |
+---+------------+-------------+-------------+
| 1 | Alice | 1 | 200 |
| 2 | Alice | 2 | 250 |
| 3 | Alice | 3 | 400 |
| 1 | Alice | 7 | 100 |
| 2 | Alice | 8 | 200 |
| 3 | Bob | 1 | 1000 |
| 1 | Bob | 32 | 500 |
| 2 | Bob | 33 | 400 |
| 3 | Bob | 34 | 330 |
| 1 | Bob | 15 | 800 |
| 2 | Bob | 16 | 400 |
+---+------------+-------------+-------------+

所以我想要的是按 video_part 中的连续整数进行分组,这些整数是我的事件 play,当这个连续列表中有中断时,这就是一个事件 skin_inskip_out,对于播放的每个部分,我也想获得下载字节的平均值:

+---+------------+-------------+-------------+-------------+-------------+
| | Name | Number_play | Event | Number_skips| Mean_BytesDL|
+---+------------+-------------+-------------+-------------+-------------+
| 1 | Alice | 3 | Play | 0 | 283,3 |
| 2 | Alice | 0 | Skip_in | 4 | 0 |
| 3 | Alice | 2 | Play | 0 | 150 |
| 1 | Bob | 1 | Play | 0 | 1000 |
| 2 | Bob | 0 | Skip_in | 31 | 0 |
| 3 | Bob | 3 | Play | 0 | 410 |
| 2 | Bob | 0 | Skip_out | 19 | 0 |
| 3 | Bob | 2 | Play | 0 | 600 |
+---+------------+-------------+-------------+-------------+-------------+

问题是我可以在 Python 或 Scala 中分别使用带有循环的 sub_pandas df 或带有 map 和 foreach 的子列表来完成此操作,但是在 1 To 数据上运行它需要很长时间。即使我在我的节点集群上运行它。

所以我想知道是否有一种方法可以在 Spark SQL 中做到这一点,我已经用 Groupby、flatMap 或 Agg 对 UDF 进行了一些研究。但我遇到了麻烦,因为这对我来说是全新的,希望你能以某种方式帮助我!

我在想这样的事情:

  • 按名称排序
  • 通过每个独特的名称:
  • 使用 UDF 聚合 video_part -> 这会创建三个新列其中该部分的 bytesDL 平均值

我知道这非常具体,但也许有人可以帮助我,

提前致谢,祝你有美好的一天!

最佳答案

我有一个更简单的方法。我先用文字解释一下。当有序行中有连续整数时,会发生以下情况:对于每行,row_number 与行中的值之间的差异保持不变。因此,在按 Video_part 排序的 Name 的窗口函数中添加一个附加列,例如“diff”,在其中放置 Video_part - row_number()。这将使具有连续值的所有行在此列中具有相同的值。然后,您只需 groupBy("Name", "diff") 即可获得所需的组。正确的?举一个简单的例子,想象一下“value”列中的有序数字列表及其相应的(值 - 行索引)添加列“diff”:

+----+------+-----+
|row |value |diff |
+----+------+-----+
|0 |2 |2 |
|1 |3 |2 |
|2 |4 |2 |
|3 |7 |4 |
|4 |8 |4 |
|5 |23 |18 |
|6 |24 |18 |
+----+------+-----+

因此,按 diff 分组将使您获得具有连续值的行组

应用此功能需要能够提供有序行列表中的行号。 Windows 可以做到这一点。现在看代码:

val win = Window.partitionBy("Name").orderBy("Video_part")
df.withColumn("diff", $"Video_part" - row_number().over(win))

然后只需按“diff”和“Name”分组并根据需要进行聚合

关于scala - Spark SQL 按连续整数序列分组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43678311/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com