我们如何在 pyspark 中获取数据框每个分区的样本?

how can we get a sample of each partition of a dataframe in pyspark?

我正在尝试在 pyspark 中对数据帧进行重新分区,出于好奇,我想从每个分区中获取行样本,看看它是如何工作的。理想情况下,我们将有一个函数接受数据帧、分区索引和样本分数(例如 0.1 将 return 分区中 10% 的行)和 returns 相应的较小数据帧。

我在 scala 中看到 mapPartitionsWithIndex 可以用于底层 RDD () but I could not figure how to do this pyspark (by reading https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.mapPartitionsWithIndex.html?highlight=mappartition#pyspark.RDD.mapPartitionsWithIndex)。这个功能究竟是如何工作的?或者有更好的解决方案?

我没有使用 mapPartitionsWithIndex,而是使用函数 spark_partition_id.
找到了一个简单的解决方案 您可以在下面找到一个简短的示例。

import pyspark.sql.functions as F

# create example dataframe with numbers from 1 to 100
df = spark.createDataFrame([tuple([1 + n]) for n in range(100)], ['number'])
df.rdd.getNumPartitions()   # => 8



# custom function to sample rows within partitions
def resample_in_partition(df, fraction, partition_col_name='partition_id', seed=42):
  
  # create dictionary of sampling fractions per `partition_col_name`
  fractions = df\
    .select(partition_col_name)\
    .distinct()\
    .withColumn('fraction', F.lit(fraction))\
    .rdd.collectAsMap()
  
  # stratified sampling
  sampled_df = df.stat.sampleBy(partition_col_name, fractions, seed)

  return sampled_df



df = df.withColumn('partition_id', F.spark_partition_id())
df = resample_in_partition(df, fraction=0.1)

df.show()

+------+------------+
|number|partition_id|
+------+------------+
|     8|           0|
|    22|           1|
|    44|           3|
|    49|           4|
|    50|           4|
|    57|           4|
|    64|           5|
|    86|           7|
+------+------------+

由于我的数据帧很小,近似重采样可以为每个分区提供不同的行数。对于大型数据集,这个问题应该不太明显。