gpt4 book ai didi

apache-spark - PySpark:带有标量 Pandas UDF 的无效返回类型

转载 作者:行者123 更新时间:2023-12-04 03:00:41 25 4
gpt4 key购买 nike

我正在尝试从 pandas_udf 返回特定结构。它在一个集群上工作,但在另一个集群上失败。
我尝试在组上运行 udf,这要求返回类型为数据框。

from pyspark.sql.functions import pandas_udf
import pandas as pd
import numpy as np
from pyspark.sql.types import *

schema = StructType([
StructField("Distance", FloatType()),
StructField("CarId", IntegerType())

])


def haversine(lon1, lat1, lon2, lat2):
#Calculate distance, return scalar
return 3.5 # Removed logic to facilitate reading


@pandas_udf(schema)
def totalDistance(oneCar):
dist = haversine(oneCar.Longtitude.shift(1),
oneCar.Latitude.shift(1),
oneCar.loc[1:, 'Longitude'],
oneCar.loc[1:, 'Latitude'])

return pd.DataFrame({"CarId":oneCar['CarId'].iloc[0],"Distance":np.sum(dist)},index = [0])


## Calculate the overall distance made by each car
distancePerCar= df.groupBy('CarId').apply(totalDistance)

这是我得到的异常(exception):
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in returnType(self)
114 try:
--> 115 to_arrow_type(self._returnType_placeholder)
116 except TypeError:

C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\types.py in to_arrow_type(dt)
1641 else:
-> 1642 raise TypeError("Unsupported type in conversion to Arrow: " + str(dt))
1643 return arrow_type

TypeError: Unsupported type in conversion to Arrow: StructType(List(StructField(CarId,IntegerType,true),StructField(Distance,FloatType,true)))

During handling of the above exception, another exception occurred:

NotImplementedError Traceback (most recent call last)
<ipython-input-35-4f2194cfb998> in <module>()
18 km = 6367 * c
19 return km
---> 20 @pandas_udf("CarId: int, Distance: float")
21 def totalDistance(oneUser):
22 dist = haversine(oneUser.Longtitude.shift(1), oneUser.Latitude.shift(1),

C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in _create_udf(f, returnType, evalType)
62 udf_obj = UserDefinedFunction(
63 f, returnType=returnType, name=None, evalType=evalType, deterministic=True)
---> 64 return udf_obj._wrapped()
65
66

C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in _wrapped(self)
184
185 wrapper.func = self.func
--> 186 wrapper.returnType = self.returnType
187 wrapper.evalType = self.evalType
188 wrapper.deterministic = self.deterministic

C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in returnType(self)
117 raise NotImplementedError(
118 "Invalid returnType with scalar Pandas UDFs: %s is "
--> 119 "not supported" % str(self._returnType_placeholder))
120 elif self.evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
121 if isinstance(self._returnType_placeholder, StructType):

NotImplementedError: Invalid returnType with scalar Pandas UDFs: StructType(List(StructField(CarId,IntegerType,true),StructField(Distance,FloatType,true))) is not supported

我也尝试将架构更改为
@pandas_udf("<CarId:int,Distance:float>")


@pandas_udf("CarId:int,Distance:float")

但得到同样的异常(exception)。我怀疑这与我的 pyarrow 版本有关,它与我的 pyspark 版本不兼容。

任何帮助,将不胜感激。谢谢!

最佳答案

正如错误消息(“带有标量 Pandas UDF 的无效返回类型”)中所报告的,您正在尝试创建一个 SCALAR 向量化的 Pandas UDF,但使用 StructType 架构并返回一个 Pandas DataFrame。

您应该将您的函数声明为 GROUPED MAP pandas UDF,即:

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)

标量和分组矢量化 UDF 之间的区别在 pyspark 文档中进行了解释: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf

A scalar UDF defines a transformation: One or more pandas.Series -> A pandas.Series. The returnType should be a primitive data type, e.g., DoubleType(). The length of the returned pandas.Series must be of the same as the input pandas.Series.



总而言之,标量 Pandas UDF 一次处理一列(pandas 系列),比一次处理一个行元素的传统 UDF 具有更好的性能。请注意,性能改进是由于使用 PyArrow 进行高效的 Python 序列化。

A grouped map UDF defines transformation: A pandas.DataFrame -> A pandas.DataFrame The returnType should be a StructType describing the schema of the returned pandas.DataFrame. The length of the returned pandas.DataFrame can be arbitrary and the columns must be indexed so that their position matches the corresponding field in the schema.



分组的 Pandas UDF 一次处理多行和多列(使用 Pandas DataFrame,不要与 Spark DataFrame 混淆),并且对于多变量操作非常有用和高效(尤其是在使用本地 python 数值分析和机器学习库时,如numpy、scipy、scikit-learn 等)。在这种情况下,输出是具有多列的单行 DataFrame。

请注意,我没有检查代码的内部逻辑,只检查方法论。

关于apache-spark - PySpark:带有标量 Pandas UDF 的无效返回类型,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49490059/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com