Spark RDD groupByKey + 加入 vs 加入性能

Spark RDD groupByKey + join vs join performance

我正在与其他用户共享的集群上使用 Spark。因此,仅根据 运行 时间来判断我的代码中哪一个运行效率更高是不可靠的。因为当我 运行 更高效的代码时,其他人可能 运行 大量数据起作用并使我的代码执行更长时间。

所以我可以在这里问两个问题吗:

  1. 我正在使用 join 函数加入 2 RDDs,我想在使用 join 之前使用 groupByKey(),像这样:

    rdd1.groupByKey().join(rdd2)
    

    似乎花了更长的时间,但是我记得当我使用 Hadoop Hive 时,group by 使我的查询 运行 更快。由于 Spark 使用惰性求值,我想知道 join 之前的 groupByKey 是否会使事情变得更快

  2. 我注意到Spark有一个SQL模块,目前我真的没有时间去尝试,但是请问[=33=有什么区别] 模块和 RDD SQL 类似函数?

  1. groupByKey 后跟 join 没有充分的理由比单独的 join 快。如果 rdd1rdd2 没有分区器或分区器不同,那么一个限制因素就是 HashPartitioning.

    所需的简单洗牌

    通过使用 groupByKey,您不仅可以通过保留分组所需的可变缓冲区来增加总成本,而且更重要的是,您可以使用额外的转换来生成更复杂的 DAG。 groupByKey + join:

    rdd1 = sc.parallelize([("a", 1), ("a", 3), ("b", 2)])
    rdd2 = sc.parallelize([("a", 5), ("c", 6), ("b", 7)])
    rdd1.groupByKey().join(rdd2)
    

    对比join独自一人:

    rdd1.join(rdd2)
    

    最后,这两个计划甚至都不等同,要获得相同的结果,您必须在第一个计划的基础上再添加一个 flatMap

  2. 这是一个相当广泛的问题,但要强调主要区别:

    • PairwiseRDDs 是任意Tuple2 元素的同构集合。对于默认操作,您希望 key 以有意义的方式可哈希,否则对类型没有严格的要求。相比之下,DataFrames 表现出更多的动态类型,但每一列只能包含来自 supported set of defined types. It is possible to define 的值,但它仍然必须使用基本值来表示。

    • DataFrame 使用 Catalyst Optimizer 生成逻辑和物理执行计划,并且可以生成高度优化的查询,而无需应用手动低级优化。基于 RDD 的操作简单地遵循依赖 DAG。这意味着在没有自定义优化的情况下性能会更差,但对执行的控制要好得多,并且有可能进行精细的分级调整。

其他一些要阅读的内容:

  • Why spark.ml don't implement any of spark.mllib algorithms?

我基本同意 zero323 的回答,但我认为 有理由期望 joingroupByKey 之后更快。 groupByKey减少数据量,按key对数据进行分区。这些都有助于后续 join.

的表现

我认为前者(减少数据大小)并不重要。为了获得后者(分区)的好处,您需要以相同的方式对另一个 RDD 进行分区。

例如:

val a = sc.parallelize((1 to 10).map(_ -> 100)).groupByKey()
val b = sc.parallelize((1 to 10).map(_ -> 100)).partitionBy(a.partitioner.get)
a.join(b).collect