pyspark 数据框使用 sql 或 pandas 数据框获取前 5 行

pyspark dataframe to get top 5 rows using sql or pandas dataframe

我正在尝试根据 rate_increase 获取每个地区的前 5 项。 我正在尝试使用 spark.sql 如下:

输入:

   district   item   rate_increase(%)
     Arba     coil    500
     Arba     pen    -85
     Arba     hat     50
     Cebu     oil    -40
     Cebu     pen     1100

Top5item = spark.sql('select district, item , rate_increase, ROW_NUMBER() OVER (PARTITION BY district ORDER BY rate_increase DESC) AS RowNum from rateTable where rate_increase > 0')

这行得通。 如何在同一个语句中过滤前 5 个产品。我试过如下,通过 spar.sql?

是更好的方法吗
Top5item = spark.sql('select district, item from (select NCSA, Product, growthRate, ROW_NUMBER() OVER (PARTITION BY NCSA ORDER BY growthRate DESC) AS RowNum from rateTable where rate_increase > 0) where RowNum <= 5 order by NCSA')

输出:

   district   item   rate_increase(%)
     Arba     coil    500
     Arba     hat     50
     Cebu     pen     1100

谢谢。

牢记查询执行顺序:

From/Joins -> Where -> Group by - > Having -> Select

where 子句 where RowNum <= 5 不起作用,因为它不知道什么是 RowNum

尝试使用子查询块:

spark.sql("""

select district, item , `rate_increase(%)` from (
  select row_number() over (partition by district order by `rate_increase(%)` desc) as RowNum, district,item, `rate_increase(%)`  from ddf_1  where  `rate_increase(%)` > 0 )
where RowNum <= 5 order by district, RowNum

""").show()

输出:

+--------+----+----------------+
|district|item|rate_increase(%)|
+--------+----+----------------+
|    Arba|coil|             500|
|    Arba| hat|              50|
|    Cebu| pen|            1100|
+--------+----+----------------+

我尝试使用 pandas 作为简单的解决方案。

Top5item = df.sort_values('rate_increase(%)', ascending = True).groupby(['district']).head(5)

升序(rate_increase(%))分区分组后,还是不行。谢谢

莉莉, 您可以使用 pandas 从 csv 读取数据或创建 pandas dataframe 如下所示,然后将其转换为 spark dataframe

import pandas as pd

data_1 = { 
    'district': ["Arba", "Arba", "Arba","Cebu", "Cebu"],
    'item': ['coil', 'pen', 'hat','oil','pen'],
    'rate_increase(%)': [500,-85,50,-40,1100]}
    pandas_df = pd.DataFrame(data_1)
ddf_1 = spark.createDataFrame(pandas_df)
ddf_1.createOrReplaceTempView("ddf_1")

output = spark.sql("""

select district, item , `rate_increase(%)` from (
  select row_number() over (partition by district order by `rate_increase(%)` desc) as RowNum, district,item, `rate_increase(%)`  from ddf_1  where  `rate_increase(%)` > 0 )
where RowNum <= 5 order by district, RowNum

""")

output.show()

+--------+----+----------------+
|district|item|rate_increase(%)|
+--------+----+----------------+
|    Arba|coil|             500|
|    Arba| hat|              50|
|    Cebu| pen|            1100|
+--------+----+----------------+