根据节点数 pyspark 拆分我的数据框
split my dataframe depending on the number of nodes pyspark
我正在尝试根据(我的集群的)节点数拆分我的数据帧,
我的数据框看起来像:
如果我有 node=2,并且 dataframe.count=7 :
因此,要应用迭代方法,拆分的结果将是:
我的问题是:我该怎么做?
您可以使用其中一个 rdd 分区函数来做到这一点(查看下面的代码),但我不推荐它,因为
只要您不完全了解自己在做什么以及这样做的原因。一般来说(或者对大多数用例来说更好)最好让 spark 处理数据分布。
import pyspark.sql.functions as F
import itertools
import math
#creating a random dataframe
l = [(x,x+2) for x in range(1009)]
columns = ['one', 'two']
df=spark.createDataFrame(l, columns)
#create on partition to asign a partition key
df = df.coalesce(1)
#number of nodes (==partitions)
pCount = 5
#creating a list of partition keys
#basically it repeats range(5) several times until we have enough keys for each row
partitionKey = list(itertools.chain.from_iterable(itertools.repeat(x, math.ceil(df.count()/pCount)) for x in range(pCount)))
#now we can distribute the data to the partitions
df = df.rdd.partitionBy(pCount, partitionFunc = lambda x: partitionKey.pop()).toDF()
#This shows us the number of records within each partition
df.withColumn("partition_id", F.spark_partition_id()).groupBy("partition_id").count().show()
输出:
+------------+-----+
|partition_id|count|
+------------+-----+
| 1| 202|
| 3| 202|
| 4| 202|
| 2| 202|
| 0| 201|
+------------+-----+
我正在尝试根据(我的集群的)节点数拆分我的数据帧,
我的数据框看起来像:
如果我有 node=2,并且 dataframe.count=7 :
因此,要应用迭代方法,拆分的结果将是:
我的问题是:我该怎么做?
您可以使用其中一个 rdd 分区函数来做到这一点(查看下面的代码),但我不推荐它,因为
只要您不完全了解自己在做什么以及这样做的原因。一般来说(或者对大多数用例来说更好)最好让 spark 处理数据分布。
import pyspark.sql.functions as F
import itertools
import math
#creating a random dataframe
l = [(x,x+2) for x in range(1009)]
columns = ['one', 'two']
df=spark.createDataFrame(l, columns)
#create on partition to asign a partition key
df = df.coalesce(1)
#number of nodes (==partitions)
pCount = 5
#creating a list of partition keys
#basically it repeats range(5) several times until we have enough keys for each row
partitionKey = list(itertools.chain.from_iterable(itertools.repeat(x, math.ceil(df.count()/pCount)) for x in range(pCount)))
#now we can distribute the data to the partitions
df = df.rdd.partitionBy(pCount, partitionFunc = lambda x: partitionKey.pop()).toDF()
#This shows us the number of records within each partition
df.withColumn("partition_id", F.spark_partition_id()).groupBy("partition_id").count().show()
输出:
+------------+-----+
|partition_id|count|
+------------+-----+
| 1| 202|
| 3| 202|
| 4| 202|
| 2| 202|
| 0| 201|
+------------+-----+