gpt4 book ai didi

apache-spark - 奇怪的性能问题 Spark LSH MinHash approxSimilarityJoin

转载 作者:行者123 更新时间:2023-12-03 14:49:32 24 4
gpt4 key购买 nike

我正在使用 Apache Spark ML LSH 的 approxSimilarityJoin 方法加入 2 个数据集,但我看到了一些奇怪的行为。

在(内部)连接之后,数据集有点偏斜,但是每次完成一个或多个任务都需要花费过多的时间。

sparkui-1

正如您所看到的,每个任务的中位数是 6 毫秒(我在较小的源数据集上运行它进行测试),但 1 个任务需要 10 分钟。它几乎不使用任何 CPU 周期,它实际上连接了数据,但是速度太慢了。
下一个最慢的任务在 14 秒内运行,有 4 倍多的记录并且实际上溢出到磁盘。

如果你看 sparkuisql

连接本身是根据 minhash 规范和 udf 计算匹配对之间的 jaccard 距离的 pos & hashValue (minhash) 上的两个数据集之间的内部连接。

分解哈希表:

modelDataset.select(
struct(col("*")).as(inputName), posexplode(col($(outputCol))).as(explodeCols))

杰卡德距离函数:

 override protected[ml] def keyDistance(x: Vector, y: Vector): Double = {
val xSet = x.toSparse.indices.toSet
val ySet = y.toSparse.indices.toSet
val intersectionSize = xSet.intersect(ySet).size.toDouble
val unionSize = xSet.size + ySet.size - intersectionSize
assert(unionSize > 0, "The union of two input sets must have at least 1 elements")
1 - intersectionSize / unionSize
}

连接处理过的数据集:

// Do a hash join on where the exploded hash values are equal.
val joinedDataset = explodedA.join(explodedB, explodeCols)
.drop(explodeCols: _*).distinct()

// Add a new column to store the distance of the two rows.
val distUDF = udf((x: Vector, y: Vector) => keyDistance(x, y), DataTypes.DoubleType)
val joinedDatasetWithDist = joinedDataset.select(col("*"),
distUDF(col(s"$leftColName.${$(inputCol)}"), col(s"$rightColName.${$(inputCol)}")).as(distCol)
)

// Filter the joined datasets where the distance are smaller than the threshold.
joinedDatasetWithDist.filter(col(distCol) < threshold)

我尝试了缓存、重新分区甚至启用的组合 spark.speculation ,都无济于事。

数据由必须匹配的带状疱疹地址文本组成: 53536, Evansville, WI => 53, 35, 36, ev, va, an, ns, vi, il, ll, le, wi将与城市或 zip 有拼写错误的记录有很短的距离。

这给出了非常准确的结果,但可能是连接偏斜的原因。

我的问题是:
  • 什么可能导致这种差异? (一项任务需要很长时间,即使它的记录较少)
  • 如何在不丢失准确性的情况下防止 minhash 中的这种偏差?
  • 有没有更好的方法来大规模地做到这一点? (我无法 Jaro-Winkler/levenshtein 将数百万条记录与位置数据集中的所有记录进行比较)
  • 最佳答案

    可能有点晚了,但无论如何我都会在这里发布我的答案以帮助其他人。我最近在匹配拼错的公司名称( All executors dead MinHash LSH PySpark approxSimilarityJoin self-join on EMR cluster )方面遇到了类似的问题。有人通过建议使用 NGrams 来减少数据偏差来帮助我。这对我帮助很大。您也可以尝试使用例如3 克或 4 克。
    我不知道数据有多脏,但你可以尝试使用状态。它已经大大减少了可能的匹配数量。
    真正帮助我提高匹配准确性的是通过在每个组件上运行标签传播算法对连接组件(由 MinHashLSH 生成的连接匹配组)进行后处理。这也允许您增加 N(NGrams),从而减轻数据倾斜的问题,在 approxSimilarityJoin 中设置 jaccard 距离参数。不太紧密,并使用标签传播进行后处理。
    最后,我目前正在研究使用 skipgrams 来匹配它。我发现在某些情况下它效果更好,并在一定程度上减少了数据倾斜。

    关于apache-spark - 奇怪的性能问题 Spark LSH MinHash approxSimilarityJoin,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51403709/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com