Apache Spark:使用 RDD.aggregateByKey() 的 RDD.groupByKey() 的等效实现是什么?
Apache Spark: What is the equivalent implementation of RDD.groupByKey() using RDD.aggregateByKey()?
Apache Spark pyspark.RDD
API 文档提到 groupByKey()
效率低下。相反,建议使用 reduceByKey()
、aggregateByKey()
、combineByKey()
或 foldByKey()
。这将导致在洗牌之前在工作人员中进行一些聚合,从而减少工作人员之间的数据洗牌。
给定以下数据集和 groupByKey()
表达式,什么是不使用 groupByKey()
但提供相同结果的等效且有效的实现(减少跨工作者数据改组)?
dataset = [("a", 7), ("b", 3), ("a", 8)]
rdd = (sc.parallelize(dataset)
.groupByKey())
print sorted(rdd.mapValues(list).collect())
输出:
[('a', [7, 8]), ('b', [3])]
这是一个使用 aggregateByKey()
的选项。我很想知道如何使用 reduceByKey()
、combineByKey()
或 foldByKey()
来完成此操作,以及每种替代方法的 cost/benefit 是什么。
rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)])
.aggregateByKey(list(),
lambda u,v: u+[v],
lambda u1,u2: u1+u2))
print sorted(rdd.mapValues(list).collect())
输出:
[('a', [7, 8]), ('b', [3])]
以下是一个内存效率稍高的实现,但对 python 新手来说可读性较差,它产生相同的输出:
rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)])
.aggregateByKey(list(),
lambda u,v: itertools.chain(u,[v]),
lambda u1,u2: itertools.chain(u1,u2)))
print sorted(rdd.mapValues(list).collect())
As far as I can tell there is nothing to gain* in this particular case by using aggregateByKey
or a similar function. Since you're building a list there is no "real" reduction and amount of data which has to be shuffled is more or less the same.
To really observe some performance gain you need transformations which actually reduces amount of the transfered data for example counting, computing summary statistics, finding unique elements.
Regarding differences benefits of using reduceByKey()
, combineByKey()
, or foldByKey()
there is an important conceptual difference which is easier to see when you consider Scala API singatures.
Both reduceByKey
and foldByKey
map from RDD[(K, V)]
to RDD[(K, V)]
while the second one provides additional zero element.
reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]
foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)]
combineByKey
(there is no aggregateByKey
, but it is the same type of transformation) transforms from RDD[(K, V)]
to RDD[(K, C)]
:
combineByKey[C](
createCombiner: (V) ⇒ C,
mergeValue: (C, V) ⇒ C,
mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]
Going back to your example only combineByKey
(and in PySpark aggregateByKey
) is really applicable since you are transforming from RDD[(String, Int)]
to RDD[(String, List[Int])]
.
While in a dynamic language like Python it is actually possible to perform such an operation using foldByKey
or reduceByKey
it makes semantics of the code unclear and to cite @tim-peters "There should be one-- and preferably only one --obvious way to do it" [1].
Difference between aggregateByKey
and combineByKey
is pretty much the same as between reduceByKey
and foldByKey
so for a list it is mostly a matter of taste:
def merge_value(acc, x):
acc.append(x)
return acc
def merge_combiners(acc1, acc2):
acc1.extend(acc2)
return acc1
rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)])
.combineByKey(
lambda x: [x],
lambda u, v: u + [v],
lambda u1,u2: u1+u2))
In practice you should prefer groupByKey
though. PySpark implementation is significantly more optimized compared to naive implementation like the one provided above.
1.Peters, T. PEP 20 -- The Zen of Python. (2004)。 at https://www.python.org/dev/peps/pep-0020/
* In practice there is actually quite a lot to loose here, especially when using PySpark. Python implementation of groupByKey
is significantly more optimized than naive combine by key. You can check Be Smart About groupByKey, created by me and @eliasah for an additional discussion.
Apache Spark pyspark.RDD
API 文档提到 groupByKey()
效率低下。相反,建议使用 reduceByKey()
、aggregateByKey()
、combineByKey()
或 foldByKey()
。这将导致在洗牌之前在工作人员中进行一些聚合,从而减少工作人员之间的数据洗牌。
给定以下数据集和 groupByKey()
表达式,什么是不使用 groupByKey()
但提供相同结果的等效且有效的实现(减少跨工作者数据改组)?
dataset = [("a", 7), ("b", 3), ("a", 8)]
rdd = (sc.parallelize(dataset)
.groupByKey())
print sorted(rdd.mapValues(list).collect())
输出:
[('a', [7, 8]), ('b', [3])]
这是一个使用 aggregateByKey()
的选项。我很想知道如何使用 reduceByKey()
、combineByKey()
或 foldByKey()
来完成此操作,以及每种替代方法的 cost/benefit 是什么。
rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)])
.aggregateByKey(list(),
lambda u,v: u+[v],
lambda u1,u2: u1+u2))
print sorted(rdd.mapValues(list).collect())
输出:
[('a', [7, 8]), ('b', [3])]
以下是一个内存效率稍高的实现,但对 python 新手来说可读性较差,它产生相同的输出:
rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)])
.aggregateByKey(list(),
lambda u,v: itertools.chain(u,[v]),
lambda u1,u2: itertools.chain(u1,u2)))
print sorted(rdd.mapValues(list).collect())
As far as I can tell there is nothing to gain* in this particular case by using aggregateByKey
or a similar function. Since you're building a list there is no "real" reduction and amount of data which has to be shuffled is more or less the same.
To really observe some performance gain you need transformations which actually reduces amount of the transfered data for example counting, computing summary statistics, finding unique elements.
Regarding differences benefits of using reduceByKey()
, combineByKey()
, or foldByKey()
there is an important conceptual difference which is easier to see when you consider Scala API singatures.
Both reduceByKey
and foldByKey
map from RDD[(K, V)]
to RDD[(K, V)]
while the second one provides additional zero element.
reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]
foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)]
combineByKey
(there is no aggregateByKey
, but it is the same type of transformation) transforms from RDD[(K, V)]
to RDD[(K, C)]
:
combineByKey[C](
createCombiner: (V) ⇒ C,
mergeValue: (C, V) ⇒ C,
mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]
Going back to your example only combineByKey
(and in PySpark aggregateByKey
) is really applicable since you are transforming from RDD[(String, Int)]
to RDD[(String, List[Int])]
.
While in a dynamic language like Python it is actually possible to perform such an operation using foldByKey
or reduceByKey
it makes semantics of the code unclear and to cite @tim-peters "There should be one-- and preferably only one --obvious way to do it" [1].
Difference between aggregateByKey
and combineByKey
is pretty much the same as between reduceByKey
and foldByKey
so for a list it is mostly a matter of taste:
def merge_value(acc, x):
acc.append(x)
return acc
def merge_combiners(acc1, acc2):
acc1.extend(acc2)
return acc1
rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)])
.combineByKey(
lambda x: [x],
lambda u, v: u + [v],
lambda u1,u2: u1+u2))
In practice you should prefer groupByKey
though. PySpark implementation is significantly more optimized compared to naive implementation like the one provided above.
1.Peters, T. PEP 20 -- The Zen of Python. (2004)。 at https://www.python.org/dev/peps/pep-0020/
* In practice there is actually quite a lot to loose here, especially when using PySpark. Python implementation of groupByKey
is significantly more optimized than naive combine by key. You can check Be Smart About groupByKey, created by me and @eliasah for an additional discussion.