gpt4 book ai didi

pandas - PySpark SQL 中用户定义的聚合函数

转载 作者:行者123 更新时间:2023-12-03 22:59:58 24 4
gpt4 key购买 nike

如何在 PySpark SQL 中实现用户定义的聚合函数 (UDAF)?

pyspark version = 3.0.2
python version = 3.7.10
作为一个最小的例子,我想用 UDAF 替换 AVG 聚合函数:
sc = SparkContext()
sql = SQLContext(sc)
df = sql.createDataFrame(
pd.DataFrame({'id': [1, 1, 2, 2], 'value': [1, 2, 3, 4]}))
df.createTempView('df')
rv = sql.sql('SELECT id, AVG(value) FROM df GROUP BY id').toPandas()
其中 rv 将是:
In [2]: rv
Out[2]:
id avg(value)
0 1 1.5
1 2 3.5
UDAF 如何替换 AVG在查询中?
例如,这不起作用
import numpy as np
def udf_avg(x):
return np.mean(x)
sql.udf.register('udf_avg', udf_avg)
rv = sql.sql('SELECT id, udf_avg(value) FROM df GROUP BY id').toPandas()
这个想法是在纯 Python 中实现一个 UDAF,用于 SQL 聚合函数不支持的处理(例如低通滤波器)。

最佳答案

您可以将 Pandas UDF 与 GROUPED_AGG 一起使用类型。它从 Spark 接收作为 Pandas 系列的列,因此您可以调用 Series.mean列上。

import pyspark.sql.functions as F

@F.pandas_udf('float', F.PandasUDFType.GROUPED_AGG)
def avg_udf(s):
return s.mean()

df2 = df.groupBy('id').agg(avg_udf('value'))

df2.show()
+---+--------------+
| id|avg_udf(value)|
+---+--------------+
| 1| 1.5|
| 2| 3.5|
+---+--------------+
也可以注册它以在 SQL 中使用:
df.createTempView('df')
spark.udf.register('avg_udf', avg_udf)

df2 = spark.sql("select id, avg_udf(value) from df group by id")
df2.show()
+---+--------------+
| id|avg_udf(value)|
+---+--------------+
| 1| 1.5|
| 2| 3.5|
+---+--------------+

关于pandas - PySpark SQL 中用户定义的聚合函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66538664/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com