按两个值对 rdd 排序并获得每组前 10 个
sort rdd by two values and get top 10 per group
假设我在 pyspark 中有以下 RDD,其中每一行都是一个列表:
[foo, apple]
[foo, orange]
[foo, apple]
[foo, apple]
[foo, grape]
[foo, grape]
[foo, plum]
[bar, orange]
[bar, orange]
[bar, orange]
[bar, grape]
[bar, apple]
[bar, apple]
[bar, plum]
[scrog, apple]
[scrog, apple]
[scrog, orange]
[scrog, orange]
[scrog, grape]
[scrog, plum]
我想显示每个组(索引 0)的前 3 个水果(索引 1),按水果数量排序。为简单起见,假设不太关心领带(例如 scrog
对 grape
和 plum
的计数为 1;不关心哪个)。
我的目标是输出如下:
foo, apple, 3
foo, grape, 2
foo, orange, 1
bar, orange, 3
bar, apple, 2
bar, plum, 1 # <------- NOTE: could also be "grape" of count 1
scrog, orange, 2 # <---------- NOTE: "scrog" has many ties, which is okay
scrog, apple, 2
scrog, grape, 1
我能想到一个可能效率低下的方法:
- 获取独特的组并
.collect()
作为列表
- 按组筛选全部
rdd
,对水果进行计数和排序
- 使用类似
zipWithIndex()
的方式获得前 3 个计数
- 另存为格式为
(<group>, <fruit>, <count>)
的新 RDD
- 最后合并所有 RDD
但我不仅对更具体的 spark 方法感兴趣,而且对那些可能会跳过 collect()
和 zipWithIndex()
等昂贵操作的方法感兴趣。
作为奖励——但不是必需的——如果我确实想应用 sorting/filtering 来解决联系,那可能是最好的实现。
非常感谢任何建议!
UPDATE:在我的上下文中,无法使用数据帧;必须仅使用 RDD。
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col
spark = (SparkSession.builder.appName("foo").getOrCreate())
initial_list = [["foo", "apple"], ["foo", "orange"],
["foo", "apple"], ["foo", "apple"],
["foo", "grape"], ["foo", "grape"],
["foo", "plum"], ["bar", "orange"],
["bar", "orange"], ["bar", "orange"],
["bar", "grape"], ["bar", "apple"],
["bar", "apple"], ["bar", "plum"],
["scrog", "apple"], ["scrog", "apple"],
["scrog", "orange"], ["scrog", "orange"],
["scrog", "grape"], ["scrog", "plum"]]
# creating rdd
rdd = spark.sparkContext.parallelize(initial_list)
# converting rdd to dataframe
df = rdd.toDF()
# group by index 0 and index 1 to get count of each
df2 = df.groupby(df._1, df._2).count()
window = Window.partitionBy(df2['_1']).orderBy(df2['count'].desc())
# picking only first 3 from decreasing order of count
df3 = df2.select('*', rank().over(window).alias('rank')).filter(col('rank') <= 3)
# display the reqruired dataframe
df3.select('_1', '_2', 'count').orderBy('_1', col('count').desc()).show()
map
和 reduceByKey
pyspark 中的操作
用 .reduceByKey
求和,用 .groupByKey
分组,select 每个组的前 3 名用 .map
和 heapq.nlargest
。
rdd = sc.parallelize([
["foo", "apple"], ["foo", "orange"], ["foo", "apple"], ["foo", "apple"],
["foo", "grape"], ["foo", "grape"], ["foo", "plum"], ["bar", "orange"],
["bar", "orange"], ["bar", "orange"], ["bar", "grape"], ["bar", "apple"],
["bar", "apple"], ["bar", "plum"], ["scrog", "apple"], ["scrog", "apple"],
["scrog", "orange"], ["scrog", "orange"], ["scrog", "grape"], ["scrog", "plum"]
])
from operator import add
from heapq import nlargest
n = 3
results = rdd.map(lambda x: ((x[0], x[1]), 1)).reduceByKey(add) \
.map(lambda x: (x[0][0], (x[1], x[0][1]))).groupByKey() \
.map(lambda x: (x[0], nlargest(n, x[1])))
print( results.collect() )
# [('bar', [(3, 'orange'), (2, 'apple'), (1, 'plum')]),
# ('scrog', [(2, 'orange'), (2, 'apple'), (1, 'plum')]),
# ('foo', [(3, 'apple'), (2, 'grape'), (1, 'plum')])]
标准python
为了比较,如果你有一个简单的 python 列表而不是 rdd,在 python 中进行分组的最简单方法是使用字典:
data = [
["foo", "apple"], ["foo", "orange"], ["foo", "apple"], ["foo", "apple"],
["foo", "grape"], ["foo", "grape"], ["foo", "plum"], ["bar", "orange"],
["bar", "orange"], ["bar", "orange"], ["bar", "grape"], ["bar", "apple"],
["bar", "apple"], ["bar", "plum"], ["scrog", "apple"], ["scrog", "apple"],
["scrog", "orange"], ["scrog", "orange"], ["scrog", "grape"], ["scrog", "plum"]
]
from heapq import nlargest
from operator import itemgetter
d = {}
for k,v in data:
d.setdefault(k, {})
d[k][v] = d[k].get(v, 0) + 1
print(d)
# {'foo': {'apple': 3, 'orange': 1, 'grape': 2, 'plum': 1}, 'bar': {'orange': 3, 'grape': 1, 'apple': 2, 'plum': 1}, 'scrog': {'apple': 2, 'orange': 2, 'grape': 1, 'plum': 1}}
n = 3
results = [(k,v,c) for k,subd in d.items()
for v,c in nlargest(n, subd.items(), key=itemgetter(1))]
print(results)
# [('foo', 'apple', 3), ('foo', 'grape', 2), ('foo', 'orange', 1), ('bar', 'orange', 3), ('bar', 'apple', 2), ('bar', 'grape', 1), ('scrog', 'apple', 2), ('scrog', 'orange', 2), ('scrog', 'grape', 1)]
假设我在 pyspark 中有以下 RDD,其中每一行都是一个列表:
[foo, apple]
[foo, orange]
[foo, apple]
[foo, apple]
[foo, grape]
[foo, grape]
[foo, plum]
[bar, orange]
[bar, orange]
[bar, orange]
[bar, grape]
[bar, apple]
[bar, apple]
[bar, plum]
[scrog, apple]
[scrog, apple]
[scrog, orange]
[scrog, orange]
[scrog, grape]
[scrog, plum]
我想显示每个组(索引 0)的前 3 个水果(索引 1),按水果数量排序。为简单起见,假设不太关心领带(例如 scrog
对 grape
和 plum
的计数为 1;不关心哪个)。
我的目标是输出如下:
foo, apple, 3
foo, grape, 2
foo, orange, 1
bar, orange, 3
bar, apple, 2
bar, plum, 1 # <------- NOTE: could also be "grape" of count 1
scrog, orange, 2 # <---------- NOTE: "scrog" has many ties, which is okay
scrog, apple, 2
scrog, grape, 1
我能想到一个可能效率低下的方法:
- 获取独特的组并
.collect()
作为列表 - 按组筛选全部
rdd
,对水果进行计数和排序 - 使用类似
zipWithIndex()
的方式获得前 3 个计数 - 另存为格式为
(<group>, <fruit>, <count>)
的新 RDD
- 最后合并所有 RDD
但我不仅对更具体的 spark 方法感兴趣,而且对那些可能会跳过 collect()
和 zipWithIndex()
等昂贵操作的方法感兴趣。
作为奖励——但不是必需的——如果我确实想应用 sorting/filtering 来解决联系,那可能是最好的实现。
非常感谢任何建议!
UPDATE:在我的上下文中,无法使用数据帧;必须仅使用 RDD。
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col
spark = (SparkSession.builder.appName("foo").getOrCreate())
initial_list = [["foo", "apple"], ["foo", "orange"],
["foo", "apple"], ["foo", "apple"],
["foo", "grape"], ["foo", "grape"],
["foo", "plum"], ["bar", "orange"],
["bar", "orange"], ["bar", "orange"],
["bar", "grape"], ["bar", "apple"],
["bar", "apple"], ["bar", "plum"],
["scrog", "apple"], ["scrog", "apple"],
["scrog", "orange"], ["scrog", "orange"],
["scrog", "grape"], ["scrog", "plum"]]
# creating rdd
rdd = spark.sparkContext.parallelize(initial_list)
# converting rdd to dataframe
df = rdd.toDF()
# group by index 0 and index 1 to get count of each
df2 = df.groupby(df._1, df._2).count()
window = Window.partitionBy(df2['_1']).orderBy(df2['count'].desc())
# picking only first 3 from decreasing order of count
df3 = df2.select('*', rank().over(window).alias('rank')).filter(col('rank') <= 3)
# display the reqruired dataframe
df3.select('_1', '_2', 'count').orderBy('_1', col('count').desc()).show()
map
和 reduceByKey
pyspark 中的操作
用 .reduceByKey
求和,用 .groupByKey
分组,select 每个组的前 3 名用 .map
和 heapq.nlargest
。
rdd = sc.parallelize([
["foo", "apple"], ["foo", "orange"], ["foo", "apple"], ["foo", "apple"],
["foo", "grape"], ["foo", "grape"], ["foo", "plum"], ["bar", "orange"],
["bar", "orange"], ["bar", "orange"], ["bar", "grape"], ["bar", "apple"],
["bar", "apple"], ["bar", "plum"], ["scrog", "apple"], ["scrog", "apple"],
["scrog", "orange"], ["scrog", "orange"], ["scrog", "grape"], ["scrog", "plum"]
])
from operator import add
from heapq import nlargest
n = 3
results = rdd.map(lambda x: ((x[0], x[1]), 1)).reduceByKey(add) \
.map(lambda x: (x[0][0], (x[1], x[0][1]))).groupByKey() \
.map(lambda x: (x[0], nlargest(n, x[1])))
print( results.collect() )
# [('bar', [(3, 'orange'), (2, 'apple'), (1, 'plum')]),
# ('scrog', [(2, 'orange'), (2, 'apple'), (1, 'plum')]),
# ('foo', [(3, 'apple'), (2, 'grape'), (1, 'plum')])]
标准python
为了比较,如果你有一个简单的 python 列表而不是 rdd,在 python 中进行分组的最简单方法是使用字典:
data = [
["foo", "apple"], ["foo", "orange"], ["foo", "apple"], ["foo", "apple"],
["foo", "grape"], ["foo", "grape"], ["foo", "plum"], ["bar", "orange"],
["bar", "orange"], ["bar", "orange"], ["bar", "grape"], ["bar", "apple"],
["bar", "apple"], ["bar", "plum"], ["scrog", "apple"], ["scrog", "apple"],
["scrog", "orange"], ["scrog", "orange"], ["scrog", "grape"], ["scrog", "plum"]
]
from heapq import nlargest
from operator import itemgetter
d = {}
for k,v in data:
d.setdefault(k, {})
d[k][v] = d[k].get(v, 0) + 1
print(d)
# {'foo': {'apple': 3, 'orange': 1, 'grape': 2, 'plum': 1}, 'bar': {'orange': 3, 'grape': 1, 'apple': 2, 'plum': 1}, 'scrog': {'apple': 2, 'orange': 2, 'grape': 1, 'plum': 1}}
n = 3
results = [(k,v,c) for k,subd in d.items()
for v,c in nlargest(n, subd.items(), key=itemgetter(1))]
print(results)
# [('foo', 'apple', 3), ('foo', 'grape', 2), ('foo', 'orange', 1), ('bar', 'orange', 3), ('bar', 'apple', 2), ('bar', 'grape', 1), ('scrog', 'apple', 2), ('scrog', 'orange', 2), ('scrog', 'grape', 1)]