gpt4 book ai didi

apache-spark - Pyspark 窗口函数 : Counting number of categorical variables and calculating percentages

转载 作者:行者123 更新时间:2023-12-02 16:06:35 25 4
gpt4 key购买 nike

我有以下格式的数据框。每个产品都有不同的 ID、产品名称和类型。

ID  Prod Name   Type    Total Qty
1 ABC A 200
1 DEF B 350
1 GEH B 120
1 JIK C 100
1 LMO A 40
2 ABC A 10
2 DEF A 20
2 GEH C 30
2 JIK C 40
2 LMO A 50

所以我试图在单独的列中获取该产品名称和 ID 的总 A、B 和 C 的百分比。作为第一步,我尝试使用窗口函数,但它给了我整个列中“A”的计数。

df.withColumn("count_cat", F.count("Type").over(Window.partitionBy("Type")))

但是我需要这样的东西

ID  total Products  Total Qty   % of A  % of B  % of C
1 5 810 0.29 0.58 0.12

最佳答案

方法 1:按聚合分组

根据您的预期输出,基于 GROUP BY Id 的聚合就足够了。

假设您的初始数据集存储在数据帧 input_df

中,您可以使用以下方法实现此目的

使用spark sql

  1. 通过创建临时 View 确保您的数据框可访问
input_df.createOrReplaceTempView("input_df")
  1. 在您的 spark session 中运行下面的 sql
output_df = sparkSession.sql("""
SELECT
ID,
COUNT(Prod_Name) as `total products`,
SUM(Total_Qty) as `Total Qty`,
SUM(
CASE WHEN Type='A' THEN Total_Qty END
) / SUM(Total_Qty) as `% of A`,
SUM(
CASE WHEN Type='B' THEN Total_Qty END
) / SUM(Total_Qty) as `% of B`,
SUM(
CASE WHEN Type='C' THEN Total_Qty END
) / SUM(Total_Qty) as `% of C`
FROM
input_df
GROUP BY
ID
""").na.fill(0)

使用 pyspark API

from pyspark.sql import functions as F

output_df = (
input_df.groupBy("ID")
.agg(
F.count("Prod_Name").alias("total products"),
F.sum("Total_Qty").alias("Total Qty"),
(F.sum(
F.when(
F.col("Type")=="A",F.col("Total_Qty")
).otherwise(0)
) / F.sum("Total_Qty")).alias("% of A"),
(F.sum(
F.when(
F.col("Type")=="B",F.col("Total_Qty")
).otherwise(0)
)/ F.sum("Total_Qty")).alias("% of B"),
(F.sum(
F.when(
F.col("Type")=="C",F.col("Total_Qty")
).otherwise(0)
)/ F.sum("Total_Qty")).alias("% of C")
)
)

方法 2:使用 Windows

如果您想将这些作为 5 个附加列添加到您的数据集中,您可以使用类似的聚合和以下窗口 OVER (PARTITION BY ID)Window.partitionBy( "ID")如下图

使用spark sql

  1. 通过创建临时 View 确保您的数据框可访问
input_df.createOrReplaceTempView("input_df")
  1. 在您的 spark session 中运行下面的 sql
output_df = sparkSession.sql("""
SELECT
*,
COUNT(Prod_Name) OVER (PARTITION BY ID) as `total products`,
SUM(Total_Qty) OVER (PARTITION BY ID) as `Total Qty`,
SUM(
CASE WHEN Type='A' THEN Total_Qty END
) OVER (PARTITION BY ID) / SUM(Total_Qty) OVER (PARTITION BY ID) as `% of A`,
SUM(
CASE WHEN Type='B' THEN Total_Qty END
) OVER (PARTITION BY ID)/ SUM(Total_Qty) OVER (PARTITION BY ID) as `% of B`,
SUM(
CASE WHEN Type='C' THEN Total_Qty END
) OVER (PARTITION BY ID) / SUM(Total_Qty) OVER (PARTITION BY ID) as `% of C`
FROM
input_df
GROUP BY
ID
""").na.fill(0)

使用 pyspark API

from pyspark.sql import functions as F
from pyspark.sql import Window

agg_window = Window.partitionBy("Id")

output_df = (
input_df.withColumn(
"total products",
F.count("Prod_Name").over(agg_window)
)
.withColumn(
"Total Qty",
F.sum("Total_Qty").over(agg_window)
)
.withColumn(
"% of A",
F.sum(
F.when(
F.col("Type")=="A",F.col("Total_Qty")
).otherwise(0)
).over(agg_window) / F.sum("Total_Qty").over(agg_window)
)
.withColumn(
"% of B",
F.sum(
F.when(
F.col("Type")=="B",F.col("Total_Qty")
).otherwise(0)
).over(agg_window) / F.sum("Total_Qty").over(agg_window)
)
.withColumn(
"% of C",
F.sum(
F.when(
F.col("Type")=="C",F.col("Total_Qty")
).otherwise(0)
).over(agg_window) / F.sum("Total_Qty").over(agg_window)
)
)

让我知道这是否适合您。

关于apache-spark - Pyspark 窗口函数 : Counting number of categorical variables and calculating percentages,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69285687/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com