使用 union 比加入 apache spark 更有效,还是没关系?
is it more efficient to use unions rather than joins in apache spark, or does it not matter?
最近我 运行 在 apache spark 集群上工作,我打算在两个 rdds 上进行内部连接。但是我后来认为对于这个计算我可以通过使用 union、reduceByKey 和 filter 来避免连接。但这基本上就是 join 已经在做的事情吗?
假设我在 rdd 中有具有以下结构的对象:
{ 'key':'someKey', 'value': <some positive integer> }
然后为了避免加入我会写:
leftRDD = rdd1.map(lambda y: (y['key'], (1, y['value'], -1))
rightRDD = rdd2.map(lambda y: (y['key'], (0, -1, y['value']))
joinedRDD = (leftRDD + rightRDD) \
.reduceByKey(lambda x,y: (max(x[0],y[0]), max(x[1],y[1]), max(x[2],y[2])) \
.filter(lambda y: y[1][0] == 1)
joinedRDD 现在可以有效地获得与我进行内部连接相同的结果,但是为了避免连接而增加的复杂性值得吗?
Pyspark 连接的可扩展性通常很差 - 因此您对手动 RDD 操作的预感可能是好的。
特别是 pyspark 中的联接会丢失分区 - 因此不支持联合联接。
关于细节:您应该注意 reduceByKey 的语义:它输出与输入相同的数据结构。根据您的代码,您可能会期待一些不同的东西。
查看 (PySpark) Nested lists after reduceByKey 了解有关 reduceByKey 的更多信息。
更新
本机 scala 版本在保留现有分区方面更积极(不引起完全洗牌):
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
相反,python 版本总是引起随机播放:
shuffled = locally_combined.partitionBy(numPartitions)
正是出于这个原因,我注意到使用 reduceByKey 的 pyspark 的性能问题。
总体 'answer' 不是明确的是或否:我是这么说的 "could be yes" - 取决于你如何编写自定义 pyspark RDD 代码与仅使用 join() - 总是引起洗牌。
最近我 运行 在 apache spark 集群上工作,我打算在两个 rdds 上进行内部连接。但是我后来认为对于这个计算我可以通过使用 union、reduceByKey 和 filter 来避免连接。但这基本上就是 join 已经在做的事情吗?
假设我在 rdd 中有具有以下结构的对象:
{ 'key':'someKey', 'value': <some positive integer> }
然后为了避免加入我会写:
leftRDD = rdd1.map(lambda y: (y['key'], (1, y['value'], -1))
rightRDD = rdd2.map(lambda y: (y['key'], (0, -1, y['value']))
joinedRDD = (leftRDD + rightRDD) \
.reduceByKey(lambda x,y: (max(x[0],y[0]), max(x[1],y[1]), max(x[2],y[2])) \
.filter(lambda y: y[1][0] == 1)
joinedRDD 现在可以有效地获得与我进行内部连接相同的结果,但是为了避免连接而增加的复杂性值得吗?
Pyspark 连接的可扩展性通常很差 - 因此您对手动 RDD 操作的预感可能是好的。
特别是 pyspark 中的联接会丢失分区 - 因此不支持联合联接。
关于细节:您应该注意 reduceByKey 的语义:它输出与输入相同的数据结构。根据您的代码,您可能会期待一些不同的东西。
查看 (PySpark) Nested lists after reduceByKey 了解有关 reduceByKey 的更多信息。
更新
本机 scala 版本在保留现有分区方面更积极(不引起完全洗牌):
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
相反,python 版本总是引起随机播放:
shuffled = locally_combined.partitionBy(numPartitions)
正是出于这个原因,我注意到使用 reduceByKey 的 pyspark 的性能问题。
总体 'answer' 不是明确的是或否:我是这么说的 "could be yes" - 取决于你如何编写自定义 pyspark RDD 代码与仅使用 join() - 总是引起洗牌。