我们如何在 pyspark 中获取数据框每个分区的样本?
how can we get a sample of each partition of a dataframe in pyspark?
我正在尝试在 pyspark 中对数据帧进行重新分区,出于好奇,我想从每个分区中获取行样本,看看它是如何工作的。理想情况下,我们将有一个函数接受数据帧、分区索引和样本分数(例如 0.1 将 return 分区中 10% 的行)和 returns 相应的较小数据帧。
我在 scala 中看到 mapPartitionsWithIndex 可以用于底层 RDD () but I could not figure how to do this pyspark (by reading https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.mapPartitionsWithIndex.html?highlight=mappartition#pyspark.RDD.mapPartitionsWithIndex)。这个功能究竟是如何工作的?或者有更好的解决方案?
我没有使用 mapPartitionsWithIndex
,而是使用函数 spark_partition_id
.
找到了一个简单的解决方案
您可以在下面找到一个简短的示例。
import pyspark.sql.functions as F
# create example dataframe with numbers from 1 to 100
df = spark.createDataFrame([tuple([1 + n]) for n in range(100)], ['number'])
df.rdd.getNumPartitions() # => 8
# custom function to sample rows within partitions
def resample_in_partition(df, fraction, partition_col_name='partition_id', seed=42):
# create dictionary of sampling fractions per `partition_col_name`
fractions = df\
.select(partition_col_name)\
.distinct()\
.withColumn('fraction', F.lit(fraction))\
.rdd.collectAsMap()
# stratified sampling
sampled_df = df.stat.sampleBy(partition_col_name, fractions, seed)
return sampled_df
df = df.withColumn('partition_id', F.spark_partition_id())
df = resample_in_partition(df, fraction=0.1)
df.show()
+------+------------+
|number|partition_id|
+------+------------+
| 8| 0|
| 22| 1|
| 44| 3|
| 49| 4|
| 50| 4|
| 57| 4|
| 64| 5|
| 86| 7|
+------+------------+
由于我的数据帧很小,近似重采样可以为每个分区提供不同的行数。对于大型数据集,这个问题应该不太明显。
我正在尝试在 pyspark 中对数据帧进行重新分区,出于好奇,我想从每个分区中获取行样本,看看它是如何工作的。理想情况下,我们将有一个函数接受数据帧、分区索引和样本分数(例如 0.1 将 return 分区中 10% 的行)和 returns 相应的较小数据帧。
我在 scala 中看到 mapPartitionsWithIndex 可以用于底层 RDD (
我没有使用 mapPartitionsWithIndex
,而是使用函数 spark_partition_id
.
找到了一个简单的解决方案
您可以在下面找到一个简短的示例。
import pyspark.sql.functions as F
# create example dataframe with numbers from 1 to 100
df = spark.createDataFrame([tuple([1 + n]) for n in range(100)], ['number'])
df.rdd.getNumPartitions() # => 8
# custom function to sample rows within partitions
def resample_in_partition(df, fraction, partition_col_name='partition_id', seed=42):
# create dictionary of sampling fractions per `partition_col_name`
fractions = df\
.select(partition_col_name)\
.distinct()\
.withColumn('fraction', F.lit(fraction))\
.rdd.collectAsMap()
# stratified sampling
sampled_df = df.stat.sampleBy(partition_col_name, fractions, seed)
return sampled_df
df = df.withColumn('partition_id', F.spark_partition_id())
df = resample_in_partition(df, fraction=0.1)
df.show()
+------+------------+
|number|partition_id|
+------+------------+
| 8| 0|
| 22| 1|
| 44| 3|
| 49| 4|
| 50| 4|
| 57| 4|
| 64| 5|
| 86| 7|
+------+------------+
由于我的数据帧很小,近似重采样可以为每个分区提供不同的行数。对于大型数据集,这个问题应该不太明显。