使用 Python 计算 Spark 中 Pairwise (K,V) RDD 中每个 KEY 的平均值
Calculating the averages for each KEY in a Pairwise (K,V) RDD in Spark with Python
我想与 Python 解决方案共享这个特定的 Apache Spark,因为它的文档很差。
我想通过 KEY 计算 K/V 对(存储在 Pairwise RDD 中)的平均值。示例数据如下所示:
>>> rdd1.take(10) # Show a small sample.
[(u'2013-10-09', 7.60117302052786),
(u'2013-10-10', 9.322709163346612),
(u'2013-10-10', 28.264462809917358),
(u'2013-10-07', 9.664429530201343),
(u'2013-10-07', 12.461538461538463),
(u'2013-10-09', 20.76923076923077),
(u'2013-10-08', 11.842105263157894),
(u'2013-10-13', 32.32514177693762),
(u'2013-10-13', 26.249999999999996),
(u'2013-10-13', 10.693069306930692)]
现在下面的代码序列是不太理想的方法,但它确实有效。这是我在找到更好的解决方案之前所做的。这并不可怕,但是——正如您将在答案部分看到的那样——有一种更简洁、更有效的方法。
>>> import operator
>>> countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}
>>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerators (i.e. the SUMs).
>>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide each SUM by it's denominator (i.e. COUNT)
>>> print(rdd1.collect())
[(u'2013-10-09', 11.235365503035176),
(u'2013-10-07', 23.39500642456595),
... snip ...
]
现在更好的方法是使用 rdd.aggregateByKey()
方法。因为此方法在 Apache Spark 中的 Python 文档中的记录太少了 -- 这就是我编写此问答 的原因 -- 直到最近我一直在使用上面的代码序列。但同样,它的效率较低,因此除非必要,否则避免这样做。
以下是如何使用 rdd.aggregateByKey()
方法(推荐):
通过KEY,同时计算SUM(我们要计算的平均值的分子)和COUNT(我们要计算的平均值的分母):
>>> aTuple = (0,0) # As of Python3, you can't pass a literal sequence to a function.
>>> rdd1 = rdd1.aggregateByKey(aTuple, lambda a,b: (a[0] + b, a[1] + 1),
lambda a,b: (a[0] + b[0], a[1] + b[1]))
以下关于上面每个 a
和 b
对的含义正确的地方(因此您可以想象发生的事情):
First lambda expression for Within-Partition Reduction Step::
a: is a TUPLE that holds: (runningSum, runningCount).
b: is a SCALAR that holds the next Value
Second lambda expression for Cross-Partition Reduction Step::
a: is a TUPLE that holds: (runningSum, runningCount).
b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount).
最后,计算每个KEY的平均值,并收集结果。
>>> finalResult = rdd1.mapValues(lambda v: v[0]/v[1]).collect()
>>> print(finalResult)
[(u'2013-09-09', 11.235365503035176),
(u'2013-09-01', 23.39500642456595),
(u'2013-09-03', 13.53240060820617),
(u'2013-09-05', 13.141148418977687),
... snip ...
]
我希望这个 aggregateByKey()
的问答对您有所帮助。
在我看来,与具有两个 lambda 的 aggregateByKey 等效的更具可读性的是:
rdd1 = rdd1 \
.mapValues(lambda v: (v, 1)) \
.reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1]))
这样,整个平均计算将是:
avg_by_key = rdd1 \
.mapValues(lambda v: (v, 1)) \
.reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])) \
.mapValues(lambda v: v[0]/v[1]) \
.collectAsMap()
只是添加一个关于此问题的直观且较短(但不好)的解决方案的注释。 Sam's Teach Yourself Apache Spark in 24 Hours这本书在上一章已经很好地解释了这个问题。
使用groupByKey
可以像这样轻松解决问题:
rdd = sc.parallelize([
(u'2013-10-09', 10),
(u'2013-10-09', 10),
(u'2013-10-09', 13),
(u'2013-10-10', 40),
(u'2013-10-10', 45),
(u'2013-10-10', 50)
])
rdd \
.groupByKey() \
.mapValues(lambda x: sum(x) / len(x)) \
.collect()
输出:
[('2013-10-10', 45.0), ('2013-10-09', 11.0)]
这很直观也很吸引人,但是不要使用它! groupByKey
不对映射器进行任何组合,并将所有单独的键值对带到缩减器。
尽量避免groupByKey
。使用@pat 的 reduceByKey
解决方案。
prismalytics.io 的答案略有改进。
在某些情况下,计算总和可能会溢出数字,因为我们正在对大量值求和。我们可以改为保留平均值并继续从平均值计算平均值,并且两个部分的数量减少。
如果您有两个部分的平均值分别为 (a1, c1) 和 (a2, c2),则总体平均值为:
total/counts = (total1 + total2)/ (count1 + counts2) = (a1*c1 + a2*c2)/(c1+c2)
如果记为R = c2/c1,则可以进一步改写为a1/(1+R) + a2*R/(1+R)
如果进一步将Ri记为1/(1+R),则可以写为a1*Ri + a2*R*Ri
myrdd = sc.parallelize([1.1, 2.4, 5, 6.0, 2, 3, 7, 9, 11, 13, 10])
sumcount_rdd = myrdd.map(lambda n : (n, 1))
def avg(A, B):
R = 1.0*B[1]/A[1]
Ri = 1.0/(1+R);
av = A[0]*Ri + B[0]*R*Ri
return (av, B[1] + A[1]);
(av, counts) = sumcount_rdd.reduce(avg)
print(av)
通过简单地使用 mapValues 代替 map 和 reduceByKey 代替 reduce,这种方法可以转换为键值。
本文来自:https://www.knowbigdata.com/blog/interview-questions-apache-spark-part-2
我想与 Python 解决方案共享这个特定的 Apache Spark,因为它的文档很差。
我想通过 KEY 计算 K/V 对(存储在 Pairwise RDD 中)的平均值。示例数据如下所示:
>>> rdd1.take(10) # Show a small sample.
[(u'2013-10-09', 7.60117302052786),
(u'2013-10-10', 9.322709163346612),
(u'2013-10-10', 28.264462809917358),
(u'2013-10-07', 9.664429530201343),
(u'2013-10-07', 12.461538461538463),
(u'2013-10-09', 20.76923076923077),
(u'2013-10-08', 11.842105263157894),
(u'2013-10-13', 32.32514177693762),
(u'2013-10-13', 26.249999999999996),
(u'2013-10-13', 10.693069306930692)]
现在下面的代码序列是不太理想的方法,但它确实有效。这是我在找到更好的解决方案之前所做的。这并不可怕,但是——正如您将在答案部分看到的那样——有一种更简洁、更有效的方法。
>>> import operator
>>> countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}
>>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerators (i.e. the SUMs).
>>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide each SUM by it's denominator (i.e. COUNT)
>>> print(rdd1.collect())
[(u'2013-10-09', 11.235365503035176),
(u'2013-10-07', 23.39500642456595),
... snip ...
]
现在更好的方法是使用 rdd.aggregateByKey()
方法。因为此方法在 Apache Spark 中的 Python 文档中的记录太少了 -- 这就是我编写此问答 的原因 -- 直到最近我一直在使用上面的代码序列。但同样,它的效率较低,因此除非必要,否则避免这样做。
以下是如何使用 rdd.aggregateByKey()
方法(推荐):
通过KEY,同时计算SUM(我们要计算的平均值的分子)和COUNT(我们要计算的平均值的分母):
>>> aTuple = (0,0) # As of Python3, you can't pass a literal sequence to a function.
>>> rdd1 = rdd1.aggregateByKey(aTuple, lambda a,b: (a[0] + b, a[1] + 1),
lambda a,b: (a[0] + b[0], a[1] + b[1]))
以下关于上面每个 a
和 b
对的含义正确的地方(因此您可以想象发生的事情):
First lambda expression for Within-Partition Reduction Step::
a: is a TUPLE that holds: (runningSum, runningCount).
b: is a SCALAR that holds the next Value
Second lambda expression for Cross-Partition Reduction Step::
a: is a TUPLE that holds: (runningSum, runningCount).
b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount).
最后,计算每个KEY的平均值,并收集结果。
>>> finalResult = rdd1.mapValues(lambda v: v[0]/v[1]).collect()
>>> print(finalResult)
[(u'2013-09-09', 11.235365503035176),
(u'2013-09-01', 23.39500642456595),
(u'2013-09-03', 13.53240060820617),
(u'2013-09-05', 13.141148418977687),
... snip ...
]
我希望这个 aggregateByKey()
的问答对您有所帮助。
在我看来,与具有两个 lambda 的 aggregateByKey 等效的更具可读性的是:
rdd1 = rdd1 \
.mapValues(lambda v: (v, 1)) \
.reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1]))
这样,整个平均计算将是:
avg_by_key = rdd1 \
.mapValues(lambda v: (v, 1)) \
.reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])) \
.mapValues(lambda v: v[0]/v[1]) \
.collectAsMap()
只是添加一个关于此问题的直观且较短(但不好)的解决方案的注释。 Sam's Teach Yourself Apache Spark in 24 Hours这本书在上一章已经很好地解释了这个问题。
使用groupByKey
可以像这样轻松解决问题:
rdd = sc.parallelize([
(u'2013-10-09', 10),
(u'2013-10-09', 10),
(u'2013-10-09', 13),
(u'2013-10-10', 40),
(u'2013-10-10', 45),
(u'2013-10-10', 50)
])
rdd \
.groupByKey() \
.mapValues(lambda x: sum(x) / len(x)) \
.collect()
输出:
[('2013-10-10', 45.0), ('2013-10-09', 11.0)]
这很直观也很吸引人,但是不要使用它! groupByKey
不对映射器进行任何组合,并将所有单独的键值对带到缩减器。
尽量避免groupByKey
。使用@pat 的 reduceByKey
解决方案。
prismalytics.io 的答案略有改进。
在某些情况下,计算总和可能会溢出数字,因为我们正在对大量值求和。我们可以改为保留平均值并继续从平均值计算平均值,并且两个部分的数量减少。
如果您有两个部分的平均值分别为 (a1, c1) 和 (a2, c2),则总体平均值为: total/counts = (total1 + total2)/ (count1 + counts2) = (a1*c1 + a2*c2)/(c1+c2)
如果记为R = c2/c1,则可以进一步改写为a1/(1+R) + a2*R/(1+R) 如果进一步将Ri记为1/(1+R),则可以写为a1*Ri + a2*R*Ri
myrdd = sc.parallelize([1.1, 2.4, 5, 6.0, 2, 3, 7, 9, 11, 13, 10])
sumcount_rdd = myrdd.map(lambda n : (n, 1))
def avg(A, B):
R = 1.0*B[1]/A[1]
Ri = 1.0/(1+R);
av = A[0]*Ri + B[0]*R*Ri
return (av, B[1] + A[1]);
(av, counts) = sumcount_rdd.reduce(avg)
print(av)
通过简单地使用 mapValues 代替 map 和 reduceByKey 代替 reduce,这种方法可以转换为键值。
本文来自:https://www.knowbigdata.com/blog/interview-questions-apache-spark-part-2