gpt4 book ai didi

python - Pyspark Spark DataFrame - 聚合和过滤 map 类型列中的列

转载 作者:太空宇宙 更新时间:2023-11-04 05:02:09 25 4
gpt4 key购买 nike

我的 DataFrame 看起来像:

| c1 | c2|  c3  |
|----+---+-------
| A | b | 22:00|
| A | b | 23:00|
| A | b | 09:00|
| A | c | 22:00|
| B | c | 09:30|

我想执行一些聚合并创建第二个具有 3 列的 DataFrame:

c1:是我要分组的列。

ma​​p_category_room_date: map 类型,输入c2,在c3中输入较低/最小值。

cnt_orig:是对原始组有多少行的计数。

结果

|    c1    |  map_category_room_date | cnt_orig |
|----------+-------------------------+----------|
| 'A' |{'b': 09:00, 'C': 22:00} | 4 |
| 'B' |{'c': 09:30} | 1 |

我可以使用什么聚合函数来归档这是最简单的方法?

谢谢

最佳答案

您可以使用window函数生成count,然后使用内置函数按照以下步骤获得您想要的最终数据帧

from pyspark.sql import Window
windowSpec = Window.partitionBy("c1")

from pyspark.sql import functions as F
df.withColumn("cnt_orig", count('c1').over(windowSpec)).orderBy('c3').groupBy("c1", "c2", "cnt_orig").agg(first('c3').as('c3'))
.withColumn("c2", F.regexp_replace(F.regexp_replace(F.array($"c2", $"c3").cast(StringType), "[\\[\\]]", ""), ",", " : "))
.groupBy("c1", "cnt_orig").agg(F.collect_list("c2").as('map_category_room_date'))

你应该得到如下结果

+---+--------+----------------------+
|c1 |cnt_orig|map_category_room_date|
+---+--------+----------------------+
|A |4 |[b : 09:00, c : 22:00]|
|b |1 |[c : 09:00] |
+---+--------+----------------------+

Scala 方式

在 scala 中获得所需输出的工作代码是

val windowSpec = Window.partitionBy("c1")

df.withColumn("cnt_orig", count("c1").over(windowSpec)).orderBy("c3").groupBy("c1", "c2", "cnt_orig").agg(first("c3").as("c3"))
.withColumn("c2", regexp_replace(regexp_replace(array($"c2", $"c3").cast(StringType), "[\\[\\]]", ""), ",", " : "))
.groupBy("c1", "cnt_orig").agg(collect_list("c2").as("map_category_room_date"))

关于python - Pyspark Spark DataFrame - 聚合和过滤 map 类型列中的列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45445077/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com