gpt4 book ai didi

python-3.x - 是否可以将标量值与 Pandas Series 一起传递给 Pandas UDF 函数

转载 作者:行者123 更新时间:2023-12-05 06:23:26 25 4
gpt4 key购买 nike

我正在尝试对 pyspark 数据帧的两列使用 scipy.optimize.minimize 函数。

x0 参数作为数组传递给 Pandas UDF 函数时,出现以下错误:

TypeError: Invalid argument, not a string or column: [0.9  0.5  2.5  5.   0.33] of type <class 'numpy.ndarray'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

这是我要最小化的函数

def eb_func(theta, n, e):
"""
# Function to be Minimized

:param theta: float
:param n: Pandas.Series
:param e: Pandas.Series
:return: float

"""
print("Entering EB_Func")
res = res = np.prod(theta[4] * neg_bin(n, e, theta[0], theta[1]) + (1 - theta[4]) * neg_bin(n, e, theta[2], theta[3]))
return res

这是我的 neg_bin 函数:

@pandas_udf('double', PandasUDFType.SCALAR)
def neg_bin(n, e, alpha, beta):
"""

:param n:
:param e:
:param alpha:
:param beta:
:return:
"""
res_expo = gammaln(alpha + n) - gammaln(n + 1) - gammaln(alpha)
res = np.exp(res_expo)
res = res / (1 + beta / (e + 0.01)) ** n
res = res / (1 + e / beta) ** alpha
return res

这些是我的参数:

x0 = np.array([0.9, 0.5, 2.5, 5, 0.33])
bounds = ([0.000001, 200], [0.000001, 200], [0.000001, 200], [0.000001, 200], [0.000001, 1])

这是我尝试调用 scipy.optimize.minimize 函数的地方。

# Define a function to call minimize function
def RunMinimize(data):
Result = minimize(eb_func, x0, args=(data.Adolescent_a, data.Adolescent_e), method='L-BFGS-B', bounds=bounds, options={'disp': True, 'maxiter': 1000, 'eps': np.repeat(1e-4, 5)})
return Result.x


RunMinimize(df_adol)

我是 PySpark 的新手,我可以在 Pandas 中做到这一点,但现在我有一个庞大的数据集,而 Pandas 需要花费大量时间来处理它。

以下是预期的输出格式:这是我在 Pandas 中得到的输出

[1.00000000e-06, 1.46304225e+00, 1.00000000e-06, 6.39066185e+00, 1.00000000e-06])

我无法将 theta 值传递给 neg_bin 函数。因为 neg_bin 函数只需要 pandas.Series 作为输入。如果可能的话,我正在寻找一种解决方法,将 theta 值作为标量与 pandas.Series 一起作为输入发送到 neg_bin 函数。

感谢任何帮助。 TIA。

最佳答案

我尝试按照您的示例进行操作,但不幸的是,并非所有函数都已定义,也没有包含任何导入语句。因此,我在下面提供了一个更简单的示例(将温度与 C 和 F 相互转换)。

想法是将 pandas UDF 包装在另一个采用必要的标量参数的函数中。该示例在 pyspark 3.2+ 中运行,对于早期版本可能需要进行一些调整。

import pandas as pd
import pyspark.sql.functions as F
import pyspark.sql.types as T

df = [[1. , 1.1], [2., 2.1]]
df = spark.createDataFrame(df, schema = ['x', 'y'])

def temp_to_temp(from_temp: str, to_temp: str) -> pd.Series:
@F.pandas_udf(T.DoubleType())
def temp_to_temp_inner(value: pd.Series) -> pd.Series:
if to_temp == 'C':
if from_temp == 'F':
return (value - 32)*5./9
else:
return value
elif to_temp == 'F':
if from_temp == 'C':
return value*9./5 + 32
else:
return value
return temp_to_temp_inner

res = df.select(temp_to_temp('C', 'F')(F.col('x')).alias('temp (F)'))

res.show()
# |temp (F)|
# +--------+
# | 33.8|
# | 35.6|
# +--------+

其中 spark 是 spark session 。

关于python-3.x - 是否可以将标量值与 Pandas Series 一起传递给 Pandas UDF 函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58320633/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com