gpt4 book ai didi

apache-spark - Pyspark 数据帧 : how to apply scipy. 按组优化功能

转载 作者:行者123 更新时间:2023-12-04 12:26:00 26 4
gpt4 key购买 nike

我有一段代码运行良好,但使用了 Pandas 数据框 groupby 处理。
但是因为文件很大(> 7000 万组,我需要将代码转换为使用 PYSPARK 数据框。
这是使用带有小示例数据的 Pandas 数据框的原始代码:

import pandas as pd
import numpy as np
from scipy.optimize import minimize

df = pd.DataFrame({
'y0': np.random.randn(20),
'y1': np.random.randn(20),
'x0': np.random.randn(20),
'x1': np.random.randn(20),
'grpVar': ['a', 'b'] * 10})

# Starting values
startVal = np.ones(2)*(1/2)

#Constraint Sum of coefficients = 0
cons = ({'type':'eq', 'fun': lambda x: 1 - sum(x)})

# Bounds on coefficients
bnds = tuple([0,1] for x in startVal)

# Define a function to calculate sum of squared differences
def SumSqDif(a, df):
return np.sum((df['y0'] - a[0]*df['x0'])**2 + (df['y1'] - a[1]*df['x1']) **2)

# Define a function to call minimize function
def RunMinimize(data, startVal, bnds, cons):
ResultByGrp = minimize(SumSqDif, startVal, method='SLSQP',
bounds=bnds, constraints = cons, args=(data))
return ResultByGrp.x

# Do the calculation by applyng the function by group:
# Create GroupBy object
grp_grpVar = df.groupby('grpVar')

Results = grp_grpVar.apply(RunMinimize, startVal=startVal, bnds=bnds, cons=cons))

现在我正在尝试使用 pySpark 数据框
为了测试代码,我将 Pandas 数据帧转换为 pyspark 数据帧。
sdf = sqlContext.createDataFrame(df)
type(sdf)
# <class 'pyspark.sql.dataframe.DataFrame'>

# Create GroupBy object
Sgrp_grpVar = sdf.groupby('grpVar')

# Redefine functions
def sSumSqDif(a, sdf):
return np.sum((sdf['y0'] - a[0]*sdf['x0'])**2 + (sdf['y1'] - a[1]*sdf['x1'])**2)

def sRunMinimize(data=sdf, startVal=startVal, bnds=bnds, cons=cons):
ResultByGrp = minimize(sSumSqDif, startVal, method='SLSQP',
bounds=bnds, constraints = cons, args=(data))
return ResultByGrp.x

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import DoubleType
from pyspark.sql.types import StringType

udf = UserDefinedFunction(sRunMinimize , StringType())

Results = Sgrp_grpVar.agg(sRunMinimize())

但是,在我尝试定义用户定义函数 udf 后,出现以下错误 - 请参见下文。
任何帮助纠正我的错误或提出替代方法的帮助都受到高度赞赏。

udf = UserDefinedFunction(sRunMinimize, StringType())
回溯(最近一次调用最后一次):
文件“”,第 1 行,在
文件“/usr/hdp/current/spark2-client/python/pyspark/sql/functions.py”,第 1760 行,在 中初始化
self._judf = self._create_judf(name).......

最佳答案

您正在尝试编写在 pyspark 中无法完成的用户定义的聚合函数,请参阅 https://stackoverflow.com/a/40030740
你可以写的是一个 UDF 在每个组中作为列表收集的数据上:

首先是设置:

import pandas as pd 
import numpy as np
from scipy.optimize import minimize
import pyspark.sql.functions as psf
from pyspark.sql.types import *

df = pd.DataFrame({
'y0': np.random.randn(20),
'y1': np.random.randn(20),
'x0': np.random.randn(20),
'x1': np.random.randn(20),
'grpVar': ['a', 'b'] * 10})
sdf = sqlContext.createDataFrame(df)

# Starting values
startVal = np.ones(2)*(1/2)
#Constraint Sum of coefficients = 0
cons = ({'type':'eq', 'fun': lambda x: 1 - sum(x)})
# Bounds on coefficients
bnds = tuple([0,1] for x in startVal)
我们将广播这些变量,因为我们需要在聚合数据帧的每一行上调用它们,它会将值复制到每个节点,因此它们不必在驱动程序上获取它们:
sc.broadcast(startVal)
sc.broadcast(bnds)
让我们使用 collect_list 聚合数据,我们将更改数据的结构,因此我们只有一列(您可以将每一列收集到不同的列中,但随后您必须修改将数据传递给函数的方式):
Sgrp_grpVar = sdf\
.groupby('grpVar')\
.agg(psf.collect_list(psf.struct("y0", "y1", "x0", "x1")).alias("data"))
Sgrp_grpVar.printSchema()

root
|-- grpVar: string (nullable = true)
|-- data: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- y0: double (nullable = true)
| | |-- y1: double (nullable = true)
| | |-- x0: double (nullable = true)
| | |-- x1: double (nullable = true)
我们现在可以创建我们的 UDF ,返回的数据类型对于 pyspark 来说太复杂了,pyspark 不支持 numpy arrays 所以我们需要稍微改变一下:
def sSumSqDif(a, data):
return np.sum(
(data['y0'] - a[0]*data['x0'])**2 \
+ (data['y1'] - a[1]*data['x1'])**2)

def sRunMinimize(data, startVal=startVal, bnds=bnds, cons=cons):
data = pd.DataFrame({k:v for k,v in zip(["y0", "y1", "x0", "x1"], data)})
ResultByGrp = minimize(sSumSqDif, startVal, method='SLSQP',
bounds=bnds, constraints = cons, args=(data))
return ResultByGrp.x.tolist()

sRunMinimize_udf = lambda startVal, bnds, cons: psf.udf(
lambda data: sRunMinimize(data, startVal, bnds, cons),
ArrayType(DoubleType())
)
我们现在可以将此函数应用于每个组中收集的数据:
Results = Sgrp_grpVar.select(
"grpVar",
sRunMinimize_udf(startVal, bnds, cons)("data").alias("res")
)
Results.show(truncate=False)

+------+-----------------------------------------+
|grpVar|res |
+------+-----------------------------------------+
|b |[0.4073139282953772, 0.5926860717046227] |
|a |[0.8275186444565927, 0.17248135554340727]|
+------+-----------------------------------------+
但我不认为 pyspark 是正确的工具。

关于apache-spark - Pyspark 数据帧 : how to apply scipy. 按组优化功能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46247428/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com