gpt4 book ai didi

scala - 如何使用 Scala 聚合 Spark 数据帧以获得稀疏向量?

转载 作者:行者123 更新时间:2023-12-01 10:28:37 25 4
gpt4 key购买 nike

我在 Spark 中有一个如下所示的数据框,我想按 id 列对其进行分组,然后对于分组数据中的每一行,我需要创建一个包含元素的稀疏向量来自 weight 列,位于 index 列指定的索引处。稀疏向量的长度是已知的,在这个例子中假设为 1000。

数据框df:

+-----+------+-----+
| id|weight|index|
+-----+------+-----+
|11830| 1| 8|
|11113| 1| 3|
| 1081| 1| 3|
| 2654| 1| 3|
|10633| 1| 3|
|11830| 1| 28|
|11351| 1| 12|
| 2737| 1| 26|
|11113| 3| 2|
| 6590| 1| 2|
+-----+------+-----+

我已阅读 this这有点类似于我想做的,但对于 rdd。有谁知道使用 Scala 对 Spark 中的数据框执行此操作的好方法?

到目前为止,我的尝试是首先将权重和索引收集为如下列表:

val dfWithLists = df
.groupBy("id")
.agg(collect_list("weight") as "weights", collect_list("index") as "indices"))

看起来像:

+-----+---------+----------+
| id| weights| indices|
+-----+---------+----------+
|11830| [1, 1]| [8, 28]|
|11113| [1, 3]| [3, 2]|
| 1081| [1]| [3]|
| 2654| [1]| [3]|
|10633| [1]| [3]|
|11351| [1]| [12]|
| 2737| [1]| [26]|
| 6590| [1]| [2]|
+-----+---------+----------+

然后我定义一个 udf 并执行如下操作:

import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.sql.functions.udf

def toSparseVector: ((Array[Int], Array[BigInt]) => Vector) = {(a1, a2) => Vectors.sparse(1000, a1, a2.map(x => x.toDouble))}
val udfToSparseVector = udf(toSparseVector)

val dfWithSparseVector = dfWithLists.withColumn("SparseVector", udfToSparseVector($"indices", $"weights"))

但这似乎行不通,而且感觉应该有一种更简单的方法可以做到这一点,而无需先将权重和索引收集到列表中。

我是 Spark、Dataframes 和 Scala 的新手,所以非常感谢任何帮助。

最佳答案

你必须收集它们,因为向量必须是本地的,单台机器:https://spark.apache.org/docs/latest/mllib-data-types.html#local-vector

要创建稀疏向量,您有 2 个选项,使用无序(索引,值)对或指定索引和值数组: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.linalg.Vectors$

如果您可以将数据转换为不同的格式(旋转),您还可以使用 VectorAssembler: https://spark.apache.org/docs/latest/ml-features.html#vectorassembler

通过一些小的调整,您可以使您的方法奏效:

:paste
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

val df = Seq((11830,1,8), (11113, 1, 3), (1081, 1,3), (2654, 1, 3), (10633, 1, 3), (11830, 1, 28), (11351, 1, 12), (2737, 1, 26), (11113, 3, 2), (6590, 1, 2)).toDF("id", "weight", "index")

val dfWithFeat = df
.rdd
.map(r => (r.getInt(0), (r.getInt(2), r.getInt(1).toDouble)))
.groupByKey()
.map(r => LabeledPoint(r._1, Vectors.sparse(1000, r._2.toSeq)))
.toDS

dfWithFeat.printSchema
dfWithFeat.show(10, false)


// Exiting paste mode, now interpreting.

root
|-- label: double (nullable = true)
|-- features: vector (nullable = true)

+-------+-----------------------+
|label |features |
+-------+-----------------------+
|11113.0|(1000,[2,3],[3.0,1.0]) |
|2737.0 |(1000,[26],[1.0]) |
|10633.0|(1000,[3],[1.0]) |
|1081.0 |(1000,[3],[1.0]) |
|6590.0 |(1000,[2],[1.0]) |
|11830.0|(1000,[8,28],[1.0,1.0])|
|2654.0 |(1000,[3],[1.0]) |
|11351.0|(1000,[12],[1.0]) |
+-------+-----------------------+

dfWithFeat: org.apache.spark.sql.Dataset[org.apache.spark.mllib.regression.LabeledPoint] = [label: double, features: vector]

关于scala - 如何使用 Scala 聚合 Spark 数据帧以获得稀疏向量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45285359/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com