gpt4 book ai didi

apache-spark - 用户定义的函数要应用于 PySpark 中的 Window?

转载 作者:行者123 更新时间:2023-12-03 11:11:00 25 4
gpt4 key购买 nike

我正在尝试将用户定义的函数应用于 PySpark 中的 Window。我读过 UDAF 可能是要走的路,但我找不到任何具体的东西。

举个例子(取自这里:Xinh's Tech Blog 并针对 PySpark 进行了修改):

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

spark = SparkSession.builder.master("local").config(conf=SparkConf()).getOrCreate()

a = spark.createDataFrame([[1, "a"], [2, "b"], [3, "c"], [4, "d"], [5, "e"]], ['ind', "state"])

customers = spark.createDataFrame([["Alice", "2016-05-01", 50.00],
["Alice", "2016-05-03", 45.00],
["Alice", "2016-05-04", 55.00],
["Bob", "2016-05-01", 25.00],
["Bob", "2016-05-04", 29.00],
["Bob", "2016-05-06", 27.00]],
["name", "date", "amountSpent"])

customers.show()

window_spec = Window.partitionBy("name").orderBy("date").rowsBetween(-1, 1)

result = customers.withColumn( "movingAvg", avg(customers["amountSpent"]).over(window_spec))

result.show()

我正在申请 avg已内置于 pyspark.sql.functions , 但如果不是 avg我想使用更复杂的东西并编写自己的函数,我该怎么做?

最佳答案

Spark >= 3.0 :

SPARK-24561 - 带有 Pandas udf(有界窗口)的用户定义窗口函数正在进行中。详情请关注相关JIRA。

Spark >= 2.4 :

SPARK-22239 - 带有 Pandas udf(无界窗口)的用户定义窗口函数引入了对基于 Pandas 的带有无界窗口的窗口函数的支持。一般结构是

return_type: DataType

@pandas_udf(return_type, PandasUDFType.GROUPED_AGG)
def f(v):
return ...

w = (Window
.partitionBy(grouping_column)
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))

df.withColumn('foo', f('bar').over(w))

有关详细示例,请参阅 doctestsunit tests

Spark < 2.4

你不能。窗口函数需要 UserDefinedAggregateFunction 或等效对象,而不是 UserDefinedFunction ,并且不可能在 PySpark 中定义一个。

但是,在 PySpark 2.3 或更高版本中,您可以定义矢量化 pandas_udf ,它可以应用于分组数据。您可以找到一个工作示例 Applying UDFs on GroupedData in PySpark (with functioning python example) 。虽然 Pandas 不提供窗口函数的直接等价物,但有足够的表现力来实现任何类似窗口的逻辑,尤其是 pandas.DataFrame.rolling 。此外,与 GroupedData.apply 一起使用的函数可以返回任意数量的行。

您还可以从 PySpark Spark: How to map Python with Scala or Java User Defined Functions? 调用 Scala UDAF。

关于apache-spark - 用户定义的函数要应用于 PySpark 中的 Window?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48160252/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com