gpt4 book ai didi

scala - 优化 Spark 作业,它必须计算每个条目的相似度并为每个条目输出前 N 个相似项

转载 作者:行者123 更新时间:2023-12-01 16:24:25 24 4
gpt4 key购买 nike

我有一个 Spark 作业需要计算基于电影内容的相似性。有46k电影。每部电影都由一组 SparseVectors 表示(每个向量是电影字段之一的特征向量,例如标题、情节、流派、 Actor 等)。例如,对于 Actor 和流派,向量显示给定 Actor 在电影中是存在 (1) 还是不存在 (0)。

任务是为每部电影找到前 10 部相似的电影。
我设法在 Scala 中编写了一个脚本来执行所有这些计算并完成工作。它适用于较小的电影集,例如 1000 部电影,但不适用于整个数据集(内存不足等)。

我进行此计算的方法是在电影数据集上使用交叉连接。然后通过仅采用movie1_id 此时的数据集仍将包含 46000^2/2 行,即 1058000000。
每一行都有大量的数据。

然后我计算每一行的相似度分数。计算相似度后,我将 movie1_id 相同的结果分组,并使用 Window 函数取前 N 个项目(类似于此处描述的方式: Spark get top N highest score results for each (item1, item2, score))按相似度分数按降序对它们进行排序。

问题是 - 它可以在 Spark 中更有效地完成吗?例如。无需执行 crossJoin?

还有一个问题——Spark 如何处理如此庞大的数据帧(1058000000 行,由多个 SparseVectors 组成)?它是否必须一次将所有这些保存在内存中?或者它是否以某种方式一块一块地处理这些数据帧?

我正在使用以下函数来计算电影向量之间的相似度:

def intersectionCosine(movie1Vec: SparseVector, movie2Vec: SparseVector): Double = {
val a: BSV[Double] = toBreeze(movie1Vec)
val b: BSV[Double] = toBreeze(movie2Vec)

var dot: Double = 0
var offset: Int = 0
while( offset < a.activeSize) {
val index: Int = a.indexAt(offset)
val value: Double = a.valueAt(offset)

dot += value * b(index)
offset += 1
}

val bReduced: BSV[Double] = new BSV(a.index, a.index.map(i => b(i)), a.index.length)
val maga: Double = magnitude(a)
val magb: Double = magnitude(bReduced)

if (maga == 0 || magb == 0)
return 0
else
return dot / (maga * magb)
}

Dataframe 中的每一行都包含两个连接的类:
final case class MovieVecData(imdbID: Int,
Title: SparseVector,
Decade: SparseVector,
Plot: SparseVector,
Genres: SparseVector,
Actors: SparseVector,
Countries: SparseVector,
Writers: SparseVector,
Directors: SparseVector,
Productions: SparseVector,
Rating: Double
)

最佳答案

它可以更有效地完成,只要您可以使用近似值,并且不需要精确的结果(或精确的数字或​​结果)。

类似于我对 Efficient string matching in Apache Spark 的回答您可以使用 LSH,包括:

  • BucketedRandomProjectionLSH 近似欧几里得距离。
  • MinHashLSH 来近似 Jaccard 距离。

  • 如果特征空间很小(或可以合理减少)并且每个类别都相对较小,您还可以手动优化您的代码:
  • explode特征数组从单个记录生成#features 记录。
  • 按特征自连接结果,计算距离并过滤出候选者(当且仅当它们共享特定的分类特征时,才会比较每对记录)。
  • 使用您当前的代码获取最高记录。

  • 一个最小的例子是(将其视为伪代码):
    import org.apache.spark.ml.linalg._

    // This is oversimplified. In practice don't assume only sparse scenario
    val indices = udf((v: SparseVector) => v.indices)

    val df = Seq(
    (1L, Vectors.sparse(1024, Array(1, 3, 5), Array(1.0, 1.0, 1.0))),
    (2L, Vectors.sparse(1024, Array(3, 8, 12), Array(1.0, 1.0, 1.0))),
    (3L, Vectors.sparse(1024, Array(3, 5), Array(1.0, 1.0))),
    (4L, Vectors.sparse(1024, Array(11, 21), Array(1.0, 1.0))),
    (5L, Vectors.sparse(1024, Array(21, 32), Array(1.0, 1.0)))
    ).toDF("id", "features")

    val possibleMatches = df
    .withColumn("key", explode(indices($"features")))
    .transform(df => df.alias("left").join(df.alias("right"), Seq("key")))

    val closeEnough(threshold: Double) = udf((v1: SparseVector, v2: SparseVector) => intersectionCosine(v1, v2) > threshold)

    possilbeMatches.filter(closeEnough($"left.features", $"right.features")).select($"left.id", $"right.id").distinct

    请注意,仅当散列/特征具有足够的选择性(并且最佳稀疏)时,这两种解决方案才值得开销。在上面显示的示例中,您将只比较集合 {1, 2, 3} 和 {4, 5} 内的行,而不是集合之间的行。

    然而,在最坏的情况下(M 条记录,N 条特征),我们可以进行 N M2 次比较,而不是 M2

    关于scala - 优化 Spark 作业,它必须计算每个条目的相似度并为每个条目输出前 N 个相似项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50088548/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com