按计数对 collect_set 进行排序
Sorting a collect_set by count
我知道collect_set可以随机排序。
是否有不同的方法来按计数订购 collect_set?我想根据单独的 id 列的分组依据为单个列创建一组最受欢迎的项目。
你要 collect_list 然后 运行 计数吗?
如果我没理解错的话,你想做人气排名analysis.you需要使用collect_list来保留重复值。
from collections import Counter
from pyspark.sql import SparkSession
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
def elem_cnt(arr):
return ['{}({})'.format(*i) for i in Counter(arr).most_common()]
spark.udf.register('elem_cnt_udf', elem_cnt)
data = [
('AC Milan', 'Ronaldo Luiz'),
('AC Milan', 'Paolo Maldini'),
('AC Milan', 'Kaká'),
('AC Milan', 'Ronaldo Luiz'),
('AC Milan', 'Andriy Shevchenko'),
('AC Milan', 'Van Basten'),
('AC Milan', 'Ronaldo Luiz'),
('AC Milan', 'Andriy Shevchenko'),
('AC Milan', 'Van Basten'),
('Milan', 'Ronaldo Luiz'),
('Milan', 'Paolo Maldini'),
('Milan', 'Ronaldo Luiz'),
('Milan', 'Van Basten')
]
schema = """
id string,name string
"""
df = spark.createDataFrame(data, schema)
df.createOrReplaceTempView('tmp')
rank_sql = """
select id,elem_cnt_udf(collect_list(name)) rank from tmp
group by id
"""
rank_df = spark.sql(rank_sql)
rank_df.show(truncate=False)
不,没有按计数排序 collect_set
的方法,因为收集聚合方法不计算项目,没有可用于排序项目的信息。
因此,自 Spark 3.1 及更高版本以来,给定一个包含两列 id
和 item
的 dataframe
,您可以:
- 对
id
和 items
列的 groupBy 执行 count
- collect
(count, item)
用 collect_list
and struct
耦合到数组。 注意:这里可以用collect_set
代替collect_list
,但是没用因为我们确定(count, item)
的每个元素都是唯一的
- 使用
sort_array
按计数降序对数组进行排序
- 将您的阵列与
transform
映射到删除 count
。
可以翻译成如下代码:
from pyspark.sql import functions as F
final_df = dataframe.groupBy('id', 'item').count() \
.groupBy('id') \
.agg(
F.transform(
F.sort_array(
F.collect_list(F.struct("count", "item")),
asc=False
),
lambda x: x.getItem('item')
).alias('popular_items')
)
注意:如果你的spark版本低于3.1但高于1.6,你可以将transform
替换为withColumn
如下:
from pyspark.sql import functions as F
final_df = dataframe.groupBy('id', 'item').count() \
.groupBy('id') \
.agg(F.sort_array(F.collect_list(F.struct("count", "item")), asc=False).alias('popular_items')) \
.withColumn("popular_items", F.col('popular_items.item'))
例子
使用以下输入数据框:
+---+-----+
|id |item |
+---+-----+
|1 |item1|
|1 |item2|
|1 |item2|
|1 |item2|
|1 |item3|
|2 |item3|
|2 |item3|
|2 |item1|
|3 |item1|
|3 |item1|
+---+-----+
您得到以下输出:
+---+---------------------+
|id |popular_items |
+---+---------------------+
|1 |[item2, item3, item1]|
|3 |[item1] |
|2 |[item3, item1] |
+---+---------------------+
我知道collect_set可以随机排序。 是否有不同的方法来按计数订购 collect_set?我想根据单独的 id 列的分组依据为单个列创建一组最受欢迎的项目。 你要 collect_list 然后 运行 计数吗?
如果我没理解错的话,你想做人气排名analysis.you需要使用collect_list来保留重复值。
from collections import Counter
from pyspark.sql import SparkSession
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
def elem_cnt(arr):
return ['{}({})'.format(*i) for i in Counter(arr).most_common()]
spark.udf.register('elem_cnt_udf', elem_cnt)
data = [
('AC Milan', 'Ronaldo Luiz'),
('AC Milan', 'Paolo Maldini'),
('AC Milan', 'Kaká'),
('AC Milan', 'Ronaldo Luiz'),
('AC Milan', 'Andriy Shevchenko'),
('AC Milan', 'Van Basten'),
('AC Milan', 'Ronaldo Luiz'),
('AC Milan', 'Andriy Shevchenko'),
('AC Milan', 'Van Basten'),
('Milan', 'Ronaldo Luiz'),
('Milan', 'Paolo Maldini'),
('Milan', 'Ronaldo Luiz'),
('Milan', 'Van Basten')
]
schema = """
id string,name string
"""
df = spark.createDataFrame(data, schema)
df.createOrReplaceTempView('tmp')
rank_sql = """
select id,elem_cnt_udf(collect_list(name)) rank from tmp
group by id
"""
rank_df = spark.sql(rank_sql)
rank_df.show(truncate=False)
不,没有按计数排序 collect_set
的方法,因为收集聚合方法不计算项目,没有可用于排序项目的信息。
因此,自 Spark 3.1 及更高版本以来,给定一个包含两列 id
和 item
的 dataframe
,您可以:
- 对
id
和items
列的 groupBy 执行 - collect
(count, item)
用collect_list
andstruct
耦合到数组。 注意:这里可以用collect_set
代替collect_list
,但是没用因为我们确定(count, item)
的每个元素都是唯一的 - 使用
sort_array
按计数降序对数组进行排序 - 将您的阵列与
transform
映射到删除count
。
count
可以翻译成如下代码:
from pyspark.sql import functions as F
final_df = dataframe.groupBy('id', 'item').count() \
.groupBy('id') \
.agg(
F.transform(
F.sort_array(
F.collect_list(F.struct("count", "item")),
asc=False
),
lambda x: x.getItem('item')
).alias('popular_items')
)
注意:如果你的spark版本低于3.1但高于1.6,你可以将transform
替换为withColumn
如下:
from pyspark.sql import functions as F
final_df = dataframe.groupBy('id', 'item').count() \
.groupBy('id') \
.agg(F.sort_array(F.collect_list(F.struct("count", "item")), asc=False).alias('popular_items')) \
.withColumn("popular_items", F.col('popular_items.item'))
例子
使用以下输入数据框:
+---+-----+
|id |item |
+---+-----+
|1 |item1|
|1 |item2|
|1 |item2|
|1 |item2|
|1 |item3|
|2 |item3|
|2 |item3|
|2 |item1|
|3 |item1|
|3 |item1|
+---+-----+
您得到以下输出:
+---+---------------------+
|id |popular_items |
+---+---------------------+
|1 |[item2, item3, item1]|
|3 |[item1] |
|2 |[item3, item1] |
+---+---------------------+