如何将 spark 数据帧减少到列中每个值的最大行数?
How do I reduce a spark dataframe to a maximum amount of rows for each value in a column?
我需要减少数据名并将其导出到镶木地板。我需要确保我有前任。一列中的每个值 10000 行。
我正在使用的数据框如下所示:
+-------------+-------------------+
| Make| Model|
+-------------+-------------------+
| PONTIAC| GRAND AM|
| BUICK| CENTURY|
| LEXUS| IS 300|
|MERCEDES-BENZ| SL-CLASS|
| PONTIAC| GRAND AM|
| TOYOTA| PRIUS|
| MITSUBISHI| MONTERO SPORT|
|MERCEDES-BENZ| SLK-CLASS|
| TOYOTA| CAMRY|
| JEEP| WRANGLER|
| CHEVROLET| SILVERADO 1500|
| TOYOTA| AVALON|
| FORD| RANGER|
|MERCEDES-BENZ| C-CLASS|
| TOYOTA| TUNDRA|
| FORD|EXPLORER SPORT TRAC|
| CHEVROLET| COLORADO|
| MITSUBISHI| MONTERO|
| DODGE| GRAND CARAVAN|
+-------------+-------------------+
我需要 return 最多 每个模型 10,000 行:
+--------------------+-------+
| Model| count|
+--------------------+-------+
| MDX|1658647|
| ASTRO| 682657|
| ENTOURAGE| 72622|
| ES 300H| 80712|
| 6 SERIES| 145252|
| GRAN FURY| 9719|
|RANGE ROVER EVOQU...| 4290|
| LEGACY WAGON| 2070|
| LEGACY SEDAN| 104|
| DAKOTA CHASSIS CAB| 8|
| CAMARO|2028678|
| XT| 10009|
| DYNASTY| 171776|
| 944| 43044|
| F430 SPIDER| 506|
|FLEETWOOD SEVENTY...| 6|
| MONTE CARLO|1040806|
| LIBERTY|2415456|
| ESCALADE| 798832|
| SIERRA 3500 CLASSIC| 9541|
+--------------------+-------+
不一样,因为正如其他人在下面建议的那样,它只检索值 大于 其他值的行。我想要 for each value in df['Model']: limit rows for that value(model) to 10,000 if there are 10,000 or more rows
(显然是伪代码)。也就是说,如果有多于 10,000行,则去掉其余的,否则保留所有行。
干脆
import pyspark.sql.functions as F
df = df.groupBy("Model").agg(F.count(F.lit(1)).alias("Count"))
df = df.filter(df["Count"] < 10000).select("Model", "Count")
df.write.parquet("data.parquet")
如果我理解你的问题,你想抽样几行(例如 10000),但这些记录的计数应该大于 10000。如果我理解你的问题,这就是答案:
df = df.groupBy('Make', 'Model').agg(count(lit(1)).alias('count'))
df = df.filter(df['count']>10000).select('Model','count')
df.write.parquet('output.parquet')
我将稍微修改给定的问题,以便它可以在此处可视化,方法是将每个不同值的最大行数减少到 2 行(而不是 10,000)。
示例数据框:
df = spark.createDataFrame(
[('PONTIAC', 'GRAND AM'), ('BUICK', 'CENTURY'), ('LEXUS', 'IS 300'), ('MERCEDES-BENZ', 'SL-CLASS'), ('PONTIAC', 'GRAND AM'), ('TOYOTA', 'PRIUS'), ('MITSUBISHI', 'MONTERO SPORT'), ('MERCEDES-BENZ', 'SLK-CLASS'), ('TOYOTA', 'CAMRY'), ('JEEP', 'WRANGLER'), ('MERCEDES-BENZ', 'SL-CLASS'), ('PONTIAC', 'GRAND AM'), ('TOYOTA', 'PRIUS'), ('MITSUBISHI', 'MONTERO SPORT'), ('MERCEDES-BENZ', 'SLK-CLASS'), ('TOYOTA', 'CAMRY'), ('JEEP', 'WRANGLER'), ('CHEVROLET', 'SILVERADO 1500'), ('TOYOTA', 'AVALON'), ('FORD', 'RANGER'), ('MERCEDES-BENZ', 'C-CLASS'), ('TOYOTA', 'TUNDRA'), ('TOYOTA', 'PRIUS'), ('MITSUBISHI', 'MONTERO SPORT'), ('MERCEDES-BENZ', 'SLK-CLASS'), ('TOYOTA', 'CAMRY'), ('JEEP', 'WRANGLER'), ('CHEVROLET', 'SILVERADO 1500'), ('TOYOTA', 'AVALON'), ('FORD', 'RANGER'), ('MERCEDES-BENZ', 'C-CLASS'), ('TOYOTA', 'TUNDRA'), ('FORD', 'EXPLORER SPORT TRAC'), ('CHEVROLET', 'COLORADO'), ('MITSUBISHI', 'MONTERO'), ('DODGE', 'GRAND CARAVAN')],
['Make', 'Model']
)
让我们计算行数:
df.groupby('Model').count().collect()
+-------------------+-----+
| Model|count|
+-------------------+-----+
| AVALON| 2|
| CENTURY| 1|
| TUNDRA| 2|
| WRANGLER| 3|
| GRAND AM| 3|
|EXPLORER SPORT TRAC| 1|
| C-CLASS| 2|
| MONTERO SPORT| 3|
| CAMRY| 3|
| GRAND CARAVAN| 1|
| SILVERADO 1500| 2|
| PRIUS| 3|
| MONTERO| 1|
| COLORADO| 1|
| RANGER| 2|
| SLK-CLASS| 3|
| SL-CLASS| 2|
| IS 300| 1|
+-------------------+-----+
如果我对你的问题的理解正确,你可以通过 Model
:
为每个分区分配一个行号
from pyspark.sql import Window
from pyspark.sql.functions import row_number, desc
win_1 = Window.partitionBy('Model').orderBy(desc('Make'))
df = df.withColumn('row_num', row_number().over(win_1))
然后将行过滤到 row_num <= 2
:
df = df.filter(df.row_num <= 2).select('Make', 'Model')
总共应该有2+1+2+2+2+1+2+2+2+1+2+2+1+1+2+2+2+1 = 30行
最终结果:
+-------------+-------------------+
| Make| Model|
+-------------+-------------------+
| TOYOTA| AVALON|
| TOYOTA| AVALON|
| BUICK| CENTURY|
| TOYOTA| TUNDRA|
| TOYOTA| TUNDRA|
| JEEP| WRANGLER|
| JEEP| WRANGLER|
| PONTIAC| GRAND AM|
| PONTIAC| GRAND AM|
| FORD|EXPLORER SPORT TRAC|
|MERCEDES-BENZ| C-CLASS|
|MERCEDES-BENZ| C-CLASS|
| MITSUBISHI| MONTERO SPORT|
| MITSUBISHI| MONTERO SPORT|
| TOYOTA| CAMRY|
| TOYOTA| CAMRY|
| DODGE| GRAND CARAVAN|
| CHEVROLET| SILVERADO 1500|
| CHEVROLET| SILVERADO 1500|
| TOYOTA| PRIUS|
| TOYOTA| PRIUS|
| MITSUBISHI| MONTERO|
| CHEVROLET| COLORADO|
| FORD| RANGER|
| FORD| RANGER|
|MERCEDES-BENZ| SLK-CLASS|
|MERCEDES-BENZ| SLK-CLASS|
|MERCEDES-BENZ| SL-CLASS|
|MERCEDES-BENZ| SL-CLASS|
| LEXUS| IS 300|
+-------------+-------------------+
我想你应该把 row_number
和 window
、orderBy
和 partitionBy
放在一起来查询结果,然后你可以用你的限制进行过滤。例如,获取随机洗牌并将样本限制为每个值 10,000 行,如下所示:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
window = Window.partitionBy(df['Model']).orderBy(F.rand())
df = df.select(F.col('*'),
F.row_number().over(window).alias('row_number')) \
.where(F.col('row_number') <= 10000)
我需要减少数据名并将其导出到镶木地板。我需要确保我有前任。一列中的每个值 10000 行。
我正在使用的数据框如下所示:
+-------------+-------------------+
| Make| Model|
+-------------+-------------------+
| PONTIAC| GRAND AM|
| BUICK| CENTURY|
| LEXUS| IS 300|
|MERCEDES-BENZ| SL-CLASS|
| PONTIAC| GRAND AM|
| TOYOTA| PRIUS|
| MITSUBISHI| MONTERO SPORT|
|MERCEDES-BENZ| SLK-CLASS|
| TOYOTA| CAMRY|
| JEEP| WRANGLER|
| CHEVROLET| SILVERADO 1500|
| TOYOTA| AVALON|
| FORD| RANGER|
|MERCEDES-BENZ| C-CLASS|
| TOYOTA| TUNDRA|
| FORD|EXPLORER SPORT TRAC|
| CHEVROLET| COLORADO|
| MITSUBISHI| MONTERO|
| DODGE| GRAND CARAVAN|
+-------------+-------------------+
我需要 return 最多 每个模型 10,000 行:
+--------------------+-------+
| Model| count|
+--------------------+-------+
| MDX|1658647|
| ASTRO| 682657|
| ENTOURAGE| 72622|
| ES 300H| 80712|
| 6 SERIES| 145252|
| GRAN FURY| 9719|
|RANGE ROVER EVOQU...| 4290|
| LEGACY WAGON| 2070|
| LEGACY SEDAN| 104|
| DAKOTA CHASSIS CAB| 8|
| CAMARO|2028678|
| XT| 10009|
| DYNASTY| 171776|
| 944| 43044|
| F430 SPIDER| 506|
|FLEETWOOD SEVENTY...| 6|
| MONTE CARLO|1040806|
| LIBERTY|2415456|
| ESCALADE| 798832|
| SIERRA 3500 CLASSIC| 9541|
+--------------------+-------+
for each value in df['Model']: limit rows for that value(model) to 10,000 if there are 10,000 or more rows
(显然是伪代码)。也就是说,如果有多于 10,000行,则去掉其余的,否则保留所有行。
干脆
import pyspark.sql.functions as F
df = df.groupBy("Model").agg(F.count(F.lit(1)).alias("Count"))
df = df.filter(df["Count"] < 10000).select("Model", "Count")
df.write.parquet("data.parquet")
如果我理解你的问题,你想抽样几行(例如 10000),但这些记录的计数应该大于 10000。如果我理解你的问题,这就是答案:
df = df.groupBy('Make', 'Model').agg(count(lit(1)).alias('count'))
df = df.filter(df['count']>10000).select('Model','count')
df.write.parquet('output.parquet')
我将稍微修改给定的问题,以便它可以在此处可视化,方法是将每个不同值的最大行数减少到 2 行(而不是 10,000)。
示例数据框:
df = spark.createDataFrame(
[('PONTIAC', 'GRAND AM'), ('BUICK', 'CENTURY'), ('LEXUS', 'IS 300'), ('MERCEDES-BENZ', 'SL-CLASS'), ('PONTIAC', 'GRAND AM'), ('TOYOTA', 'PRIUS'), ('MITSUBISHI', 'MONTERO SPORT'), ('MERCEDES-BENZ', 'SLK-CLASS'), ('TOYOTA', 'CAMRY'), ('JEEP', 'WRANGLER'), ('MERCEDES-BENZ', 'SL-CLASS'), ('PONTIAC', 'GRAND AM'), ('TOYOTA', 'PRIUS'), ('MITSUBISHI', 'MONTERO SPORT'), ('MERCEDES-BENZ', 'SLK-CLASS'), ('TOYOTA', 'CAMRY'), ('JEEP', 'WRANGLER'), ('CHEVROLET', 'SILVERADO 1500'), ('TOYOTA', 'AVALON'), ('FORD', 'RANGER'), ('MERCEDES-BENZ', 'C-CLASS'), ('TOYOTA', 'TUNDRA'), ('TOYOTA', 'PRIUS'), ('MITSUBISHI', 'MONTERO SPORT'), ('MERCEDES-BENZ', 'SLK-CLASS'), ('TOYOTA', 'CAMRY'), ('JEEP', 'WRANGLER'), ('CHEVROLET', 'SILVERADO 1500'), ('TOYOTA', 'AVALON'), ('FORD', 'RANGER'), ('MERCEDES-BENZ', 'C-CLASS'), ('TOYOTA', 'TUNDRA'), ('FORD', 'EXPLORER SPORT TRAC'), ('CHEVROLET', 'COLORADO'), ('MITSUBISHI', 'MONTERO'), ('DODGE', 'GRAND CARAVAN')],
['Make', 'Model']
)
让我们计算行数:
df.groupby('Model').count().collect()
+-------------------+-----+
| Model|count|
+-------------------+-----+
| AVALON| 2|
| CENTURY| 1|
| TUNDRA| 2|
| WRANGLER| 3|
| GRAND AM| 3|
|EXPLORER SPORT TRAC| 1|
| C-CLASS| 2|
| MONTERO SPORT| 3|
| CAMRY| 3|
| GRAND CARAVAN| 1|
| SILVERADO 1500| 2|
| PRIUS| 3|
| MONTERO| 1|
| COLORADO| 1|
| RANGER| 2|
| SLK-CLASS| 3|
| SL-CLASS| 2|
| IS 300| 1|
+-------------------+-----+
如果我对你的问题的理解正确,你可以通过 Model
:
from pyspark.sql import Window
from pyspark.sql.functions import row_number, desc
win_1 = Window.partitionBy('Model').orderBy(desc('Make'))
df = df.withColumn('row_num', row_number().over(win_1))
然后将行过滤到 row_num <= 2
:
df = df.filter(df.row_num <= 2).select('Make', 'Model')
总共应该有2+1+2+2+2+1+2+2+2+1+2+2+1+1+2+2+2+1 = 30行
最终结果:
+-------------+-------------------+
| Make| Model|
+-------------+-------------------+
| TOYOTA| AVALON|
| TOYOTA| AVALON|
| BUICK| CENTURY|
| TOYOTA| TUNDRA|
| TOYOTA| TUNDRA|
| JEEP| WRANGLER|
| JEEP| WRANGLER|
| PONTIAC| GRAND AM|
| PONTIAC| GRAND AM|
| FORD|EXPLORER SPORT TRAC|
|MERCEDES-BENZ| C-CLASS|
|MERCEDES-BENZ| C-CLASS|
| MITSUBISHI| MONTERO SPORT|
| MITSUBISHI| MONTERO SPORT|
| TOYOTA| CAMRY|
| TOYOTA| CAMRY|
| DODGE| GRAND CARAVAN|
| CHEVROLET| SILVERADO 1500|
| CHEVROLET| SILVERADO 1500|
| TOYOTA| PRIUS|
| TOYOTA| PRIUS|
| MITSUBISHI| MONTERO|
| CHEVROLET| COLORADO|
| FORD| RANGER|
| FORD| RANGER|
|MERCEDES-BENZ| SLK-CLASS|
|MERCEDES-BENZ| SLK-CLASS|
|MERCEDES-BENZ| SL-CLASS|
|MERCEDES-BENZ| SL-CLASS|
| LEXUS| IS 300|
+-------------+-------------------+
我想你应该把 row_number
和 window
、orderBy
和 partitionBy
放在一起来查询结果,然后你可以用你的限制进行过滤。例如,获取随机洗牌并将样本限制为每个值 10,000 行,如下所示:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
window = Window.partitionBy(df['Model']).orderBy(F.rand())
df = df.select(F.col('*'),
F.row_number().over(window).alias('row_number')) \
.where(F.col('row_number') <= 10000)