Apache Spark - Hive 内部连接、LIMIT 和自定义 UDF

Apache Spark - Hive inner join, LIMIT and custom UDF

我正在尝试 运行 配置单元中的查询:

这是最简单的设置(我知道我可以做一个 = 但我使用的是自定义 UDF,它不仅仅是一个相等比较)

数据集 a 和 b 各约 30,000 行

SELECT * FROM a INNER JOIN b ON Custom_UDF_Equals_Comparison(a.id, b.id) LIMIT 5

其中 custom_UDF_Equals_Comparison 只是在 a.id = b.id

之间进行相等性检查

当我 运行 这个查询时,我可以在我的日志输出中看到很多 m/r 任务正在 运行ning,假设它在两个数据集之间进行比较直到所有可能的排列进行了比较,并且远高于 5 的 LIMIT(我希望只有少数 m/r 任务,因为我知道大部分数据都可以在每个 table 的前几行中加入),为什么会这样发生? and/or我该如何解决?

编辑:

你好zero323,这是一个类似的问题,但不准确,它解释了为什么在使用UDF进行比较时对2个RDD进行完全比较,但它没有解释为什么LIMIT在限制时不停止比较找到 5 个。例如,如果在前 10 次连接尝试中找到 5 行,为什么在剩余的 30,000 * 30,000 次尝试中会出现这种情况。是因为在所有连接发生后应用了限制吗?例如它连接 30,000*30,000 行,然后将它们减少到 5?

编辑2:

  def levenshtein(str1: String, str2: String): Int = {
val lenStr1 = str1.length
val lenStr2 = str2.length

val d: Array[Array[Int]] = Array.ofDim(lenStr1 + 1, lenStr2 + 1)

for (i <- 0 to lenStr1) d(i)(0) = i
for (j <- 0 to lenStr2) d(0)(j) = j

for (i <- 1 to lenStr1; j <- 1 to lenStr2) {
  val cost = if (str1(i - 1) == str2(j-1)) 0 else 1

  d(i)(j) = min(
    d(i-1)(j  ) + 1,     // deletion
    d(i  )(j-1) + 1,     // insertion
    d(i-1)(j-1) + cost   // substitution
  )
}

d(lenStr1)(lenStr2)

}

def min(nums: Int*): Int = nums.min

def join_views( joinType: String, parameters: Any, col1: Any, col2: Any) : Boolean = {
if (joinType == "Equals") {
  if (col1 == null || col2 == null) {
    return false
  }

  return col1 == col2
}
else if (joinType == "Fuzzy_String") {
  if (col1 == null || col2 == null) {
    return false
  }

  val val1 = col1.asInstanceOf[String]
  val val2 = col2.asInstanceOf[String]

  val ratio = Utils.distancePercentage(val1, val2)

  if (ratio == 1.0) {
    return val1 == val2
  }

  return (ratio >= parameters.asInstanceOf[Double])
}

return false;

}

... ON join_views("Fuzzy_String", "0.1", a.col1, b.col1) 限制 5 = 20secs

... ON join_views("Fuzzy_String", "0.9", a.col1, b.col1) 限制 5 = 100secs

所以这里存在三个不同的问题:

  • Spark 通过使用散列和排序优化联接,因此这些优化仅适用于等值联接。其他类型的联接,包括依赖于 UDF 的联接,需要成对比较,因此需要笛卡尔积。您可以查看 了解详情。
  • 数据移动后的限制操作,尤其是shuffle,无法完全优化。您可以在 to provided by Sun Rui.

    中找到很好的解释

    由于没有随机播放,您的情况反而简单了很多,但您仍然需要将分区放在一起。

  • 限制优化基于分区,而不是记录。 Spark 开始检查第一个分区,如果满足条件的元素数量低于要求,它会迭代增加每次迭代采用的分区数量(据我记得该因素是 4)。如果您正在寻找罕见的事件,这可能会增加得非常快。