如何将 spark 数据帧减少到列中每个值的最大行数?

How do I reduce a spark dataframe to a maximum amount of rows for each value in a column?

我需要减少数据名并将其导出到镶木地板。我需要确保我有前任。一列中的每个值 10000 行。

我正在使用的数据框如下所示:

+-------------+-------------------+
|         Make|              Model|
+-------------+-------------------+
|      PONTIAC|           GRAND AM|
|        BUICK|            CENTURY|
|        LEXUS|             IS 300|
|MERCEDES-BENZ|           SL-CLASS|
|      PONTIAC|           GRAND AM|
|       TOYOTA|              PRIUS|
|   MITSUBISHI|      MONTERO SPORT|
|MERCEDES-BENZ|          SLK-CLASS|
|       TOYOTA|              CAMRY|
|         JEEP|           WRANGLER|
|    CHEVROLET|     SILVERADO 1500|
|       TOYOTA|             AVALON|
|         FORD|             RANGER|
|MERCEDES-BENZ|            C-CLASS|
|       TOYOTA|             TUNDRA|
|         FORD|EXPLORER SPORT TRAC|
|    CHEVROLET|           COLORADO|
|   MITSUBISHI|            MONTERO|
|        DODGE|      GRAND CARAVAN|
+-------------+-------------------+

我需要 return 最多 每个模型 10,000 行:

+--------------------+-------+
|               Model|  count|
+--------------------+-------+
|                 MDX|1658647|
|               ASTRO| 682657|
|           ENTOURAGE|  72622|
|             ES 300H|  80712|
|            6 SERIES| 145252|
|           GRAN FURY|   9719|
|RANGE ROVER EVOQU...|   4290|
|        LEGACY WAGON|   2070|
|        LEGACY SEDAN|    104|
|  DAKOTA CHASSIS CAB|      8|
|              CAMARO|2028678|
|                  XT|  10009|
|             DYNASTY| 171776|
|                 944|  43044|
|         F430 SPIDER|    506|
|FLEETWOOD SEVENTY...|      6|
|         MONTE CARLO|1040806|
|             LIBERTY|2415456|
|            ESCALADE| 798832|
| SIERRA 3500 CLASSIC|   9541|
+--------------------+-------+

不一样,因为正如其他人在下面建议的那样,它只检索值 大于 其他值的行。我想要 for each value in df['Model']: limit rows for that value(model) to 10,000 if there are 10,000 or more rows (显然是伪代码)。也就是说,如果有多于 10,000行,则去掉其余的,否则保留所有行。

干脆

import pyspark.sql.functions as F

df = df.groupBy("Model").agg(F.count(F.lit(1)).alias("Count"))
df = df.filter(df["Count"] < 10000).select("Model", "Count")

df.write.parquet("data.parquet")

如果我理解你的问题,你想抽样几行(例如 10000),但这些记录的计数应该大于 10000。如果我理解你的问题,这就是答案:

df = df.groupBy('Make', 'Model').agg(count(lit(1)).alias('count'))
df = df.filter(df['count']>10000).select('Model','count')
df.write.parquet('output.parquet')

我将稍微修改给定的问题,以便它可以在此处可视化,方法是将每个不同值的最大行数减少到 2 行(而不是 10,000)。

示例数据框:

df = spark.createDataFrame(
  [('PONTIAC', 'GRAND AM'), ('BUICK', 'CENTURY'), ('LEXUS', 'IS 300'), ('MERCEDES-BENZ', 'SL-CLASS'), ('PONTIAC', 'GRAND AM'), ('TOYOTA', 'PRIUS'), ('MITSUBISHI', 'MONTERO SPORT'), ('MERCEDES-BENZ', 'SLK-CLASS'), ('TOYOTA', 'CAMRY'), ('JEEP', 'WRANGLER'), ('MERCEDES-BENZ', 'SL-CLASS'), ('PONTIAC', 'GRAND AM'), ('TOYOTA', 'PRIUS'), ('MITSUBISHI', 'MONTERO SPORT'), ('MERCEDES-BENZ', 'SLK-CLASS'), ('TOYOTA', 'CAMRY'), ('JEEP', 'WRANGLER'), ('CHEVROLET', 'SILVERADO 1500'), ('TOYOTA', 'AVALON'), ('FORD', 'RANGER'), ('MERCEDES-BENZ', 'C-CLASS'), ('TOYOTA', 'TUNDRA'), ('TOYOTA', 'PRIUS'), ('MITSUBISHI', 'MONTERO SPORT'), ('MERCEDES-BENZ', 'SLK-CLASS'), ('TOYOTA', 'CAMRY'), ('JEEP', 'WRANGLER'), ('CHEVROLET', 'SILVERADO 1500'), ('TOYOTA', 'AVALON'), ('FORD', 'RANGER'), ('MERCEDES-BENZ', 'C-CLASS'), ('TOYOTA', 'TUNDRA'), ('FORD', 'EXPLORER SPORT TRAC'), ('CHEVROLET', 'COLORADO'), ('MITSUBISHI', 'MONTERO'), ('DODGE', 'GRAND CARAVAN')],
  ['Make', 'Model']
)

让我们计算行数:

df.groupby('Model').count().collect()

+-------------------+-----+
|              Model|count|
+-------------------+-----+
|             AVALON|    2|
|            CENTURY|    1|
|             TUNDRA|    2|
|           WRANGLER|    3|
|           GRAND AM|    3|
|EXPLORER SPORT TRAC|    1|
|            C-CLASS|    2|
|      MONTERO SPORT|    3|
|              CAMRY|    3|
|      GRAND CARAVAN|    1|
|     SILVERADO 1500|    2|
|              PRIUS|    3|
|            MONTERO|    1|
|           COLORADO|    1|
|             RANGER|    2|
|          SLK-CLASS|    3|
|           SL-CLASS|    2|
|             IS 300|    1|
+-------------------+-----+

如果我对你的问题的理解正确,你可以通过 Model:

为每个分区分配一个行号
from pyspark.sql import Window
from pyspark.sql.functions import row_number, desc

win_1 = Window.partitionBy('Model').orderBy(desc('Make'))
df = df.withColumn('row_num', row_number().over(win_1))

然后将行过滤到 row_num <= 2:

df = df.filter(df.row_num <= 2).select('Make', 'Model')

总共应该有2+1+2+2+2+1+2+2+2+1+2+2+1+1+2+2+2+1 = 30行

最终结果:

+-------------+-------------------+
|         Make|              Model|
+-------------+-------------------+
|       TOYOTA|             AVALON|
|       TOYOTA|             AVALON|
|        BUICK|            CENTURY|
|       TOYOTA|             TUNDRA|
|       TOYOTA|             TUNDRA|
|         JEEP|           WRANGLER|
|         JEEP|           WRANGLER|
|      PONTIAC|           GRAND AM|
|      PONTIAC|           GRAND AM|
|         FORD|EXPLORER SPORT TRAC|
|MERCEDES-BENZ|            C-CLASS|
|MERCEDES-BENZ|            C-CLASS|
|   MITSUBISHI|      MONTERO SPORT|
|   MITSUBISHI|      MONTERO SPORT|
|       TOYOTA|              CAMRY|
|       TOYOTA|              CAMRY|
|        DODGE|      GRAND CARAVAN|
|    CHEVROLET|     SILVERADO 1500|
|    CHEVROLET|     SILVERADO 1500|
|       TOYOTA|              PRIUS|
|       TOYOTA|              PRIUS|
|   MITSUBISHI|            MONTERO|
|    CHEVROLET|           COLORADO|
|         FORD|             RANGER|
|         FORD|             RANGER|
|MERCEDES-BENZ|          SLK-CLASS|
|MERCEDES-BENZ|          SLK-CLASS|
|MERCEDES-BENZ|           SL-CLASS|
|MERCEDES-BENZ|           SL-CLASS|
|        LEXUS|             IS 300|
+-------------+-------------------+

我想你应该把 row_numberwindoworderBypartitionBy 放在一起来查询结果,然后你可以用你的限制进行过滤。例如,获取随机洗牌并将样本限制为每个值 10,000 行,如下所示:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

window = Window.partitionBy(df['Model']).orderBy(F.rand())
df = df.select(F.col('*'), 
               F.row_number().over(window).alias('row_number')) \
               .where(F.col('row_number') <= 10000)