gpt4 book ai didi

python - 在 PySpark 中对 groupBy 的每一组执行 PCA

转载 作者:行者123 更新时间:2023-12-04 11:04:18 26 4
gpt4 key购买 nike

我正在寻找一种方法来运行 spark.ml.feature.PCA函数处理从 groupBy() 返回的分组数据调用数据帧。但我不确定这是否可能,或者如何实现。这是一个基本示例,希望能说明我想要做的事情:

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import PCA

df = spark.createDataFrame([[3, 1, 1], [4, 2, 1], [5, 2, 1], [3, 3, 2], [6, 2, 2], [4, 4, 2]], ["Value1", "Value2", "ID"])

df.show()
+------+------+---+
|Value1|Value2| ID|
+------+------+---+
| 3| 1| 1|
| 4| 2| 1|
| 5| 2| 1|
| 3| 3| 2|
| 6| 2| 2|
| 4| 4| 2|
+------+------+---+

assembler = VectorAssembler(inputCols=["Value1", "Value2"], outputCol="features")

df2 = assembler.transform(df)

df2.show()
+------+------+---+---------+
|Value1|Value2| ID| features|
+------+------+---+---------+
| 3| 1| 1|[3.0,1.0]|
| 4| 2| 1|[4.0,2.0]|
| 5| 2| 1|[5.0,2.0]|
| 3| 3| 2|[3.0,3.0]|
| 6| 2| 2|[6.0,2.0]|
| 4| 4| 2|[4.0,4.0]|
+------+------+---+---------+

pca = PCA(k=1, inputCol="features", outputCol="component")

此时,我有了要使用的数据框和 pca 对象。我现在想对数据帧执行 PCA,但按“ID”分组,因此我将获取 ID 为 1 的所有功能的 PCA,以及 ID 为 2 的所有功能的 PCA,只需返回组件。我可以通过以下方式手动获取这些:
>>>> pca.fit(df2.where("ID==1")).pc
DenseMatrix(2, 1, [-0.8817, -0.4719], 0)
>>>> pca.fit(dff.where("ID==2")).pc
DenseMatrix(2, 1, [-0.8817, 0.4719], 0)

但我想在数据帧中的所有不同 ID 上并行运行它,例如:
df2.groupBy("ID").map(lambda group: pca.fit(group).pc)

但是你不能用 map()在这样的分组数据上。有没有办法实现这一目标?

最佳答案

Spark >=3.0.0
截至 Spark 3.0.0 ,您可以使用 applyInPandas 将一个简单的 Python 函数应用于当前 DataFrame 的每一组,并将结果作为另一个 DataFrame 返回。您基本上需要定义返回的 DataFrame 的输出模式。
这里我将使用 scikit-learn PCA 函数而不是 Spark 实现,因为它必须应用于单个 Pandas DataFrames,而不是 Spark 的。无论如何,要找到的主成分应该是相同的。

import pandas as pd
from sklearn.decomposition import PCA
from pyspark.sql.types import StructField, StructType, DoubleType


# define PCA parameters
cols = ['Value1', 'Value2']
pca_components = 1


# define Python function
def pca_udf(pdf):
X = pdf[cols]
pca = PCA(n_components=pca_components)
PC = pca.fit_transform(X)
PC_df = pd.DataFrame(PC, columns=['PC_' + str(i+1) for i in range(pca_components)])
result = pd.concat([pdf, PC_df], axis=1, ignore_index=True)
return result


# define output schema; principal components are generated dynamically based on `pca_components`
to_append = [StructField('PC_' + str(i+1), DoubleType(), True) for i in range(pca_components)]
output_schema = StructType(df.schema.fields + to_append)


df\
.groupby('ID')\
.applyInPandas(pca_udf, output_schema)\
.show()

+------+------+---+-------------------+
|Value1|Value2| ID| PC_1|
+------+------+---+-------------------+
| 3| 1| 1| 1.1962465491226262|
| 4| 2| 1|-0.1572859751773413|
| 5| 2| 1|-1.0389605739452852|
| 3| 3| 2|-1.1755661316905914|
| 6| 2| 2| 1.941315590145264|
| 4| 4| 2|-0.7657494584546719|
+------+------+---+-------------------+

Spark <3.0.0
之前 Spark 3.0.0 - 但仍然与 Spark>=2.3.0 - 解决方案类似,但我们需要实际定义一个 pandas_udf ,一个向量化的用户定义函数,由 Spark 执行,使用 Arrow 来传输数据和 Pandas 来处理数据。无论如何,定义它的概念与之前的概念相似。
import pandas as pd
from sklearn.decomposition import PCA
from pyspark.sql.types import StructField, StructType, DoubleType
from pyspark.sql.functions import pandas_udf, PandasUDFType


# macro-function that includes the pandas_udf and allows to pass it some parameters
def pca_by_group(df, cols, pca_components=1):
# build output schema for the Pandas UDF
# principal components are generated dynamically based on `pca_components`
to_append = [StructField('PC_' + str(i+1), DoubleType(), True) for i in range(pca_components)]
output_schema = StructType(df.schema.fields + to_append)

# Pandas UDF for applying PCA within each group
@pandas_udf(output_schema, functionType=PandasUDFType.GROUPED_MAP)
def pca_udf(pdf):
X = pdf[cols]
pca = PCA(n_components=pca_components)
PC = pca.fit_transform(X)
PC_df = pd.DataFrame(PC, columns=['PC_' + str(i+1) for i in range(pca_components)])
result = pd.concat([pdf, PC_df], axis=1, ignore_index=True)
return result

# apply the Pandas UDF
df = df\
.groupby('ID')\
.apply(pca_udf)

return df


new_df = pca_by_group(df, cols=['Value1', 'Value2'], pca_components=1)
new_df.show()

+------+------+---+-------------------+
|Value1|Value2| ID| PC_1|
+------+------+---+-------------------+
| 3| 1| 1| 1.1962465491226262|
| 4| 2| 1|-0.1572859751773413|
| 5| 2| 1|-1.0389605739452852|
| 3| 3| 2|-1.1755661316905914|
| 6| 2| 2| 1.941315590145264|
| 4| 4| 2|-0.7657494584546719|
+------+------+---+-------------------+

关于python - 在 PySpark 中对 groupBy 的每一组执行 PCA,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45240556/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com