gpt4 book ai didi

hadoop - Apache Spark - Hive 内部连接、LIMIT 和自定义 UDF

转载 作者:可可西里 更新时间:2023-11-01 14:52:27 25 4
gpt4 key购买 nike

我正在尝试在配置单元中运行查询:

这是最简单的设置(我知道我可以做一个 = 但我使用的是自定义 UDF,它不仅仅是一个相等比较)

数据集 a 和 b 各有 30,000 行左右

SELECT * FROM a INNER JOIN b ON Custom_UDF_Equals_Comparison(a.id, b.id) LIMIT 5

其中 custom_UDF_Equals_Comparison 只是在 a.id = b.id 之间进行相等性检查

当我运行这个查询时,我可以在我的日志输出中看到很多 m/r 任务正在运行,假设它在两个数据集之间进行比较,直到比较所有可能的排列,并且远高于 5 的限制(我会预计只有少数 m/r 任务,因为我知道大部分数据可以在每个表的前几行中连接),为什么会发生这种情况?和/或我该如何解决?

编辑:

你好 zero323,这是一个类似的问题但不准确,它解释了为什么在使用 UDF 进行比较时执行 2 个 RDD 之间的完整比较,但它没有解释为什么 LIMIT 在限制为 5 时不停止比较成立。例如,如果在前 10 次连接尝试中找到 5 行,为什么在剩余的 30,000 * 30,000 次尝试中会出现这种情况。是因为在所有连接发生后应用了限制吗?例如它连接 30,000*30,000 行,然后将它们减少到只有 5 行?

编辑2:

  def levenshtein(str1: String, str2: String): Int = {
val lenStr1 = str1.length
val lenStr2 = str2.length

val d: Array[Array[Int]] = Array.ofDim(lenStr1 + 1, lenStr2 + 1)

for (i <- 0 to lenStr1) d(i)(0) = i
for (j <- 0 to lenStr2) d(0)(j) = j

for (i <- 1 to lenStr1; j <- 1 to lenStr2) {
val cost = if (str1(i - 1) == str2(j-1)) 0 else 1

d(i)(j) = min(
d(i-1)(j ) + 1, // deletion
d(i )(j-1) + 1, // insertion
d(i-1)(j-1) + cost // substitution
)
}

d(lenStr1)(lenStr2)

def min(nums: Int*): Int = nums.min

def join_views( joinType: String, parameters: Any, col1: Any, col2: Any) : Boolean = {
if (joinType == "Equals") {
if (col1 == null || col2 == null) {
return false
}

return col1 == col2
}
else if (joinType == "Fuzzy_String") {
if (col1 == null || col2 == null) {
return false
}

val val1 = col1.asInstanceOf[String]
val val2 = col2.asInstanceOf[String]

val ratio = Utils.distancePercentage(val1, val2)

if (ratio == 1.0) {
return val1 == val2
}

return (ratio >= parameters.asInstanceOf[Double])
}

return false;

... 在 join_views("Fuzzy_String", "0.1", a.col1, b.col1) LIMIT 5 = 20secs

... 在 join_views("Fuzzy_String", "0.9", a.col1, b.col1) LIMIT 5 = 100secs

最佳答案

所以这里存在三个不同的问题:

  • Spark 通过使用散列和排序来优化联接,因此这些优化仅适用于等值联接。其他类型的联接,包括依赖于 UDF 的联接,需要成对比较,因此需要笛卡尔积。你可以查看Why using a UDF in a SQL query leads to cartesian product?了解详情。
  • 数据移动后的限制操作,尤其是随机播放,无法完全优化。您可以在 nice answer 中找到很好的解释。至 Towards limiting the big RDDSun Rui 提供.

    自相矛盾的是,由于缺乏洗牌,您的情况更简单,但您仍然必须将分区放在一起。

  • 限制优化基于分区,而不是记录。 Spark 开始检查第一个分区,如果满足条件的元素数量低于要求,它会迭代增加每次迭代采用的分区数量(据我记得该因素是 4)。如果您正在寻找罕见的事件,这可能会增加得非常快。

关于hadoop - Apache Spark - Hive 内部连接、LIMIT 和自定义 UDF,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39126474/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com