如何获取partition中的元素个数?
How to get the number of elements in partition?
在给定分区 ID 的情况下,是否有任何方法可以获取 spark RDD 分区中的元素数量?不扫描整个分区。
像这样:
Rdd.partitions().get(index).size()
除了我没有看到这样的 API for spark。有任何想法吗?解决方法?
谢谢
下面给出了一个新的 RDD,其中的元素是每个分区的大小:
rdd.mapPartitions(iter => Array(iter.size).iterator, true)
PySpark:
num_partitions = 20000
a = sc.parallelize(range(int(1e6)), num_partitions)
l = a.glom().map(len).collect() # get length of each partition
print(min(l), max(l), sum(l)/len(l), len(l)) # check if skewed
Spark/scala:
val numPartitions = 20000
val a = sc.parallelize(0 until 1e6.toInt, numPartitions )
val l = a.glom().map(_.length).collect() # get length of each partition
print(l.min, l.max, l.sum/l.length, l.length) # check if skewed
数据帧也可以这样做,而不仅仅是 RDD。
只需将 DF.rdd.glom... 添加到上面的代码中即可。
请注意,glom()
转换每个分区 into a list 的元素,因此它会占用大量内存。内存密集度较低的版本(仅限 pyspark 版本):
import statistics
def get_table_partition_distribution(table_name: str):
def get_partition_len (iterator):
yield sum(1 for _ in iterator)
l = spark.table(table_name).rdd.mapPartitions(get_partition_len, True).collect() # get length of each partition
num_partitions = len(l)
min_count = min(l)
max_count = max(l)
avg_count = sum(l)/num_partitions
stddev = statistics.stdev(l)
print(f"{table_name} each of {num_partitions} partition's counts: min={min_count:,} avg±stddev={avg_count:,.1f} ±{stddev:,.1f} max={max_count:,}")
get_table_partition_distribution('someTable')
输出类似于
someTable each of 1445 partition's counts:
min=1,201,201 avg±stddev=1,202,811.6 ±21,783.4 max=2,030,137
pzecevic 的答案有效,但从概念上讲,无需构造数组然后将其转换为迭代器。我会直接构造迭代器,然后通过 collect 调用获取计数。
rdd.mapPartitions(iter => Iterator(iter.size), true).collect()
P.S。不确定他的回答是否真的做了更多工作,因为 Iterator.apply 可能会将其参数转换为数组。
我知道我来晚了,但我有另一种方法可以利用 spark 的内置函数来获取分区中的元素数量。它适用于 2.1 以上的 spark 版本。
说明:
我们将创建一个示例数据框 (df),获取分区 ID,根据分区 ID 进行分组,并对每条记录进行计数。
Pyspark:
>>> from pyspark.sql.functions import spark_partition_id, count as _count
>>> df = spark.sql("set -v").unionAll(spark.sql("set -v")).repartition(4)
>>> df.rdd.getNumPartitions()
4
>>> df.withColumn("partition_id", spark_partition_id()).groupBy("partition_id").agg(_count("key")).orderBy("partition_id").show()
+------------+----------+
|partition_id|count(key)|
+------------+----------+
| 0| 48|
| 1| 44|
| 2| 32|
| 3| 48|
+------------+----------+
斯卡拉:
scala> val df = spark.sql("set -v").unionAll(spark.sql("set -v")).repartition(4)
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [key: string, value: string ... 1 more field]
scala> df.rdd.getNumPartitions
res0: Int = 4
scala> df.withColumn("partition_id", spark_partition_id()).groupBy("partition_id").agg(count("key")).orderBy("partition_id").show()
+------------+----------+
|partition_id|count(key)|
+------------+----------+
| 0| 48|
| 1| 44|
| 2| 32|
| 3| 48|
+------------+----------+
在给定分区 ID 的情况下,是否有任何方法可以获取 spark RDD 分区中的元素数量?不扫描整个分区。
像这样:
Rdd.partitions().get(index).size()
除了我没有看到这样的 API for spark。有任何想法吗?解决方法?
谢谢
下面给出了一个新的 RDD,其中的元素是每个分区的大小:
rdd.mapPartitions(iter => Array(iter.size).iterator, true)
PySpark:
num_partitions = 20000
a = sc.parallelize(range(int(1e6)), num_partitions)
l = a.glom().map(len).collect() # get length of each partition
print(min(l), max(l), sum(l)/len(l), len(l)) # check if skewed
Spark/scala:
val numPartitions = 20000
val a = sc.parallelize(0 until 1e6.toInt, numPartitions )
val l = a.glom().map(_.length).collect() # get length of each partition
print(l.min, l.max, l.sum/l.length, l.length) # check if skewed
数据帧也可以这样做,而不仅仅是 RDD。 只需将 DF.rdd.glom... 添加到上面的代码中即可。
请注意,glom()
转换每个分区 into a list 的元素,因此它会占用大量内存。内存密集度较低的版本(仅限 pyspark 版本):
import statistics
def get_table_partition_distribution(table_name: str):
def get_partition_len (iterator):
yield sum(1 for _ in iterator)
l = spark.table(table_name).rdd.mapPartitions(get_partition_len, True).collect() # get length of each partition
num_partitions = len(l)
min_count = min(l)
max_count = max(l)
avg_count = sum(l)/num_partitions
stddev = statistics.stdev(l)
print(f"{table_name} each of {num_partitions} partition's counts: min={min_count:,} avg±stddev={avg_count:,.1f} ±{stddev:,.1f} max={max_count:,}")
get_table_partition_distribution('someTable')
输出类似于
someTable each of 1445 partition's counts: min=1,201,201 avg±stddev=1,202,811.6 ±21,783.4 max=2,030,137
pzecevic 的答案有效,但从概念上讲,无需构造数组然后将其转换为迭代器。我会直接构造迭代器,然后通过 collect 调用获取计数。
rdd.mapPartitions(iter => Iterator(iter.size), true).collect()
P.S。不确定他的回答是否真的做了更多工作,因为 Iterator.apply 可能会将其参数转换为数组。
我知道我来晚了,但我有另一种方法可以利用 spark 的内置函数来获取分区中的元素数量。它适用于 2.1 以上的 spark 版本。
说明: 我们将创建一个示例数据框 (df),获取分区 ID,根据分区 ID 进行分组,并对每条记录进行计数。
Pyspark:
>>> from pyspark.sql.functions import spark_partition_id, count as _count
>>> df = spark.sql("set -v").unionAll(spark.sql("set -v")).repartition(4)
>>> df.rdd.getNumPartitions()
4
>>> df.withColumn("partition_id", spark_partition_id()).groupBy("partition_id").agg(_count("key")).orderBy("partition_id").show()
+------------+----------+
|partition_id|count(key)|
+------------+----------+
| 0| 48|
| 1| 44|
| 2| 32|
| 3| 48|
+------------+----------+
斯卡拉:
scala> val df = spark.sql("set -v").unionAll(spark.sql("set -v")).repartition(4)
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [key: string, value: string ... 1 more field]
scala> df.rdd.getNumPartitions
res0: Int = 4
scala> df.withColumn("partition_id", spark_partition_id()).groupBy("partition_id").agg(count("key")).orderBy("partition_id").show()
+------------+----------+
|partition_id|count(key)|
+------------+----------+
| 0| 48|
| 1| 44|
| 2| 32|
| 3| 48|
+------------+----------+