gpt4 book ai didi

apache-spark - Spark,数据帧 : apply transformer/estimator on groups

转载 作者:行者123 更新时间:2023-12-03 09:24:41 25 4
gpt4 key购买 nike

我有一个如下所示的 DataFrame:

+-----------+-----+------------+
| userID|group| features|
+-----------+-----+------------+
|12462563356| 1| [5.0,43.0]|
|12462563701| 2| [1.0,8.0]|
|12462563701| 1| [2.0,12.0]|
|12462564356| 1| [1.0,1.0]|
|12462565487| 3| [2.0,3.0]|
|12462565698| 2| [1.0,1.0]|
|12462565698| 1| [1.0,1.0]|
|12462566081| 2| [1.0,2.0]|
|12462566081| 1| [1.0,15.0]|
|12462566225| 2| [1.0,1.0]|
|12462566225| 1| [9.0,85.0]|
|12462566526| 2| [1.0,1.0]|
|12462566526| 1| [3.0,79.0]|
|12462567006| 2| [11.0,15.0]|
|12462567006| 1| [10.0,15.0]|
|12462567006| 3| [10.0,15.0]|
|12462586595| 2| [2.0,42.0]|
|12462586595| 3| [2.0,16.0]|
|12462589343| 3| [1.0,1.0]|
+-----------+-----+------------+

其中列类型为:userID:Long、group:Int 和 features:vector。

这已经是一个分组的 DataFrame,即用户 ID 将出现在特定组中最多一次。

我的目标是扩展 features每组列。

有没有办法申请 feature transformer (就我而言,我想申请 StandardScaler ) 每组而不是将其应用于完整的 DataFrame。

附言使用 ML 不是强制性的,因此如果解决方案基于 MLlib,则没问题。

最佳答案

计算统计

Spark >= 3.0

现在 Summarizer支持标准偏差,所以

val summary = data
.groupBy($"group")
.agg(Summarizer.metrics("mean", "std")
.summary($"features").alias("stats"))
.as[(Int, (Vector, Vector))]
.collect.toMap

Spark >= 2.3

在 Spark 2.3 或更高版本中,您还可以使用 Summarizer :

import org.apache.spark.ml.stat.Summarizer

val summaryVar = data
.groupBy($"group")
.agg(Summarizer.metrics("mean", "variance")
.summary($"features").alias("stats"))
.as[(Int, (Vector, Vector))]
.collect.toMap

并调整下游代码以处理差异而不是标准偏差。

Spark < 2.0,Spark < 2.3,调整了 ml and mllib Vectors 之间的转换.

您可以使用与默认值几乎相同的代码按组计算统计信息 Scaler :

import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.sql.Row

// Compute Multivariate Statistics
val summary = data.select($"group", $"features")
.rdd
.map {
case Row(group: Int, features: Vector) => (group, features)
}
.aggregateByKey(new MultivariateOnlineSummarizer)(/* Create an empty new MultivariateOnlineSummarizer */
(agg, v) => agg.add(v), /* seqOp : Add a new sample Vector to this summarizer, and update the statistical summary. */
(agg1, agg2) => agg1.merge(agg2)) /* combOp : As MultivariateOnlineSummarizer accepts a merge action with another MultivariateOnlineSummarizer, and update the statistical summary. */
.mapValues {
s => (
s.variance.toArray.map(math.sqrt(_)), /* compute the square root variance for each key */
s.mean.toArray /* fetch the mean for each key */
)
}.collectAsMap

转型

如果预期的组数相对较少,您可以广播这些:

val summaryBd = sc.broadcast(summary)

并转换您的数据:

val scaledRows = df.rdd.map{ case Row(userID, group: Int, features: Vector) =>
val (stdev, mean) = summaryBd.value(group)
val vs = features.toArray.clone()
for (i <- 0 until vs.size) {
vs(i) = if(stdev(i) == 0.0) 0.0 else (vs(i) - mean(i)) * (1 / stdev(i))
}
Row(userID, group, Vectors.dense(vs))
}
val scaledDf = sqlContext.createDataFrame(scaledRows, df.schema)

否则,您可以简单地加入。将其包装为带有组列作为参数的 ML 转换器应该不难。

关于apache-spark - Spark,数据帧 : apply transformer/estimator on groups,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35405314/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com