- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在寻找一种方法来运行 spark.ml.feature.PCA
函数处理从 groupBy()
返回的分组数据调用数据帧。但我不确定这是否可能,或者如何实现。这是一个基本示例,希望能说明我想要做的事情:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import PCA
df = spark.createDataFrame([[3, 1, 1], [4, 2, 1], [5, 2, 1], [3, 3, 2], [6, 2, 2], [4, 4, 2]], ["Value1", "Value2", "ID"])
df.show()
+------+------+---+
|Value1|Value2| ID|
+------+------+---+
| 3| 1| 1|
| 4| 2| 1|
| 5| 2| 1|
| 3| 3| 2|
| 6| 2| 2|
| 4| 4| 2|
+------+------+---+
assembler = VectorAssembler(inputCols=["Value1", "Value2"], outputCol="features")
df2 = assembler.transform(df)
df2.show()
+------+------+---+---------+
|Value1|Value2| ID| features|
+------+------+---+---------+
| 3| 1| 1|[3.0,1.0]|
| 4| 2| 1|[4.0,2.0]|
| 5| 2| 1|[5.0,2.0]|
| 3| 3| 2|[3.0,3.0]|
| 6| 2| 2|[6.0,2.0]|
| 4| 4| 2|[4.0,4.0]|
+------+------+---+---------+
pca = PCA(k=1, inputCol="features", outputCol="component")
>>>> pca.fit(df2.where("ID==1")).pc
DenseMatrix(2, 1, [-0.8817, -0.4719], 0)
>>>> pca.fit(dff.where("ID==2")).pc
DenseMatrix(2, 1, [-0.8817, 0.4719], 0)
df2.groupBy("ID").map(lambda group: pca.fit(group).pc)
map()
在这样的分组数据上。有没有办法实现这一目标?
最佳答案
Spark >=3.0.0
截至 Spark 3.0.0
,您可以使用 applyInPandas
将一个简单的 Python 函数应用于当前 DataFrame 的每一组,并将结果作为另一个 DataFrame 返回。您基本上需要定义返回的 DataFrame 的输出模式。
这里我将使用 scikit-learn 的 PCA
函数而不是 Spark 实现,因为它必须应用于单个 Pandas DataFrames,而不是 Spark 的。无论如何,要找到的主成分应该是相同的。
import pandas as pd
from sklearn.decomposition import PCA
from pyspark.sql.types import StructField, StructType, DoubleType
# define PCA parameters
cols = ['Value1', 'Value2']
pca_components = 1
# define Python function
def pca_udf(pdf):
X = pdf[cols]
pca = PCA(n_components=pca_components)
PC = pca.fit_transform(X)
PC_df = pd.DataFrame(PC, columns=['PC_' + str(i+1) for i in range(pca_components)])
result = pd.concat([pdf, PC_df], axis=1, ignore_index=True)
return result
# define output schema; principal components are generated dynamically based on `pca_components`
to_append = [StructField('PC_' + str(i+1), DoubleType(), True) for i in range(pca_components)]
output_schema = StructType(df.schema.fields + to_append)
df\
.groupby('ID')\
.applyInPandas(pca_udf, output_schema)\
.show()
+------+------+---+-------------------+
|Value1|Value2| ID| PC_1|
+------+------+---+-------------------+
| 3| 1| 1| 1.1962465491226262|
| 4| 2| 1|-0.1572859751773413|
| 5| 2| 1|-1.0389605739452852|
| 3| 3| 2|-1.1755661316905914|
| 6| 2| 2| 1.941315590145264|
| 4| 4| 2|-0.7657494584546719|
+------+------+---+-------------------+
Spark 3.0.0
- 但仍然与
Spark>=2.3.0
- 解决方案类似,但我们需要实际定义一个
pandas_udf
,一个向量化的用户定义函数,由 Spark 执行,使用 Arrow 来传输数据和 Pandas 来处理数据。无论如何,定义它的概念与之前的概念相似。
import pandas as pd
from sklearn.decomposition import PCA
from pyspark.sql.types import StructField, StructType, DoubleType
from pyspark.sql.functions import pandas_udf, PandasUDFType
# macro-function that includes the pandas_udf and allows to pass it some parameters
def pca_by_group(df, cols, pca_components=1):
# build output schema for the Pandas UDF
# principal components are generated dynamically based on `pca_components`
to_append = [StructField('PC_' + str(i+1), DoubleType(), True) for i in range(pca_components)]
output_schema = StructType(df.schema.fields + to_append)
# Pandas UDF for applying PCA within each group
@pandas_udf(output_schema, functionType=PandasUDFType.GROUPED_MAP)
def pca_udf(pdf):
X = pdf[cols]
pca = PCA(n_components=pca_components)
PC = pca.fit_transform(X)
PC_df = pd.DataFrame(PC, columns=['PC_' + str(i+1) for i in range(pca_components)])
result = pd.concat([pdf, PC_df], axis=1, ignore_index=True)
return result
# apply the Pandas UDF
df = df\
.groupby('ID')\
.apply(pca_udf)
return df
new_df = pca_by_group(df, cols=['Value1', 'Value2'], pca_components=1)
new_df.show()
+------+------+---+-------------------+
|Value1|Value2| ID| PC_1|
+------+------+---+-------------------+
| 3| 1| 1| 1.1962465491226262|
| 4| 2| 1|-0.1572859751773413|
| 5| 2| 1|-1.0389605739452852|
| 3| 3| 2|-1.1755661316905914|
| 6| 2| 2| 1.941315590145264|
| 4| 4| 2|-0.7657494584546719|
+------+------+---+-------------------+
关于python - 在 PySpark 中对 groupBy 的每一组执行 PCA,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45240556/
给定输入: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 将数字按奇数或偶数分组,然后按小于或大于 5 分组。 预期输出: [[1, 3, 5], [2, 4], [6, 8, 10
编辑: @coldspeed、@wen-ben、@ALollz 指出了我在字符串 np.nan 中犯的新手错误。答案很好,所以我不删除这个问题来保留那些答案。 原文: 我读过这个问题/答案 What'
我试图概括我提出的问题 here . mlb 数据框看起来像 Player Position Salary Year 0 Mike Wit
我认为我不需要共享整个数据框,但基本上,这是有问题的代码行(当然,已经导入了 pandas) divstack = df[df['Competitor']=='Emma Slabach'].group
我面临下一个问题:我有组(按 ID),对于所有这些组,我需要应用以下代码:如果组内位置之间的距离在 3 米以内,则需要将它们添加在一起,因此将创建一个新组(代码如何创建我在下面显示的组)。现在,我想要
我有以下数据: ,dateTime,magnitude,occurrence,dateTime_s 1,2017-11-20 08:00:09.052260,12861,1,2017-11-20 08
我按感兴趣的列对 df 进行分组: grouped = df.groupby('columnA') 现在我只想保留至少有 5 名成员的组: grouped.filter(lambda x: len(x
数据是一个时间序列,许多成员 ID 与许多类别相关联: data_df = pd.DataFrame({'Date': ['2018-09-14 00:00:22',
选择 u.UM_TOKEN_NO 、u.UM_FULLNAME、u.SECTOR、u.department_name、t.TS_PROJECT_CODE、sum(t.TS_TOTAL_HRS) 来自
我有这两个表: +---------------+-------------+---------------------+----------+---------+ | items_ordered |
我正在使用 groupby 和 sum 快速汇总两个数据集 一个包含: sequence shares 1 100 2 200 3 50 1 2
这个问题在这里已经有了答案: list around groupby results in empty groups (3 个答案) itertools groupby object not out
我有一组行,我想按标识符的值进行分组 - 存在于每一行中 - 然后对将作为结果的组进行进一步的隔离处理。 我的数据框是这样的: In [50]: df Out[50]: groupkey b
假设您要在全局范围内销售产品,并且希望在某个主要城市的某个地方设立销售办事处。您的决定将完全基于销售数字。 这将是您的(简化的)销售数据: df={ 'Product':'Chair', 'Count
我有一个将数据分组两次的查询: var query = (from a in Context.SetA() from b in Context.SetB().Where(x => x.aId == a
我有一个这种格式的数据框: value identifier 2007-01-01 0.087085 55 2007-01-01 0.703249
这个问题在这里已经有了答案: python groupby behaviour? (3 个答案) 关闭 4 年前。 我有一个这样的列表 [u'201003', u'200403', u'200803
在 Python 中,我可以使用 itertools.groupby 将具有相同键的连续元素分组。 : >>> items = [(1, 2), (1, 5), (1, 3), (2, 9), (3,
无法翻译以下 GroupBy 查询并将引发错误:不支持客户端 GroupBy IEnumerable ids = new List { 1, 2, 3 }; var q = db.Comments.W
考虑一个 Spark DataFrame,其中只有很少的列。目标是对其执行 groupBy 操作,而不将其转换为 Pandas DataFrame。等效的 Pandas groupBy 代码如下所示:
我是一名优秀的程序员,十分优秀!