了解 Spark 创建的分区数
Understanding the number of partitions created by Spark
pyspark-sql 在读取 .csv 时会创建多少个分区?
我的理解是
number of partitions = math.ceil(file_size/spark.conf.get('spark.sql.files.maxPartitionBytes'))
在我的机器上:
spark.conf.get('spark.sql.files.maxPartitionBytes')
output:
'134217728b' #128MBs
但是,我没有观察到这种行为。我创建了一个占用磁盘 96 MB 的文件。我 运行 在本地模式下点火。我有一台 8 核笔记本电脑。我认为它应该读入 1 个分区。但是,该文件正在 8 个分区中读取。以下是我使用的代码库:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#creating a small DataFrame. This will occupy 96 MBs on disk
pd.DataFrame({'id':np.arange(10000000),'b':np.random.choice(['a','b','c','d'],size=(10000000,),p=[0.25,0.25,0.25,0.25])}).to_csv('df_s.csv',index=None)
sd=spark.read.csv('df_s.csv',schema="id int, b string")
sd.rdd.getNumPartitions()
output: 8
你能帮我理解为什么不管文件大小我都会看到 8 个分区吗?
实际的公式实际上比那复杂一点。检查下面的计算。你可以找到源代码here.
这是您的配置和文件
Spark Configuration
Value
Default
spark.sql.files.maxPartitionBytes
128M
128M
spark.sql.files.openCostInBytes
4M
4M
spark.executor.instances
1
local
spark.executor.cores
8
your cores
spark.default.parallelism
8
=spark.executor.instances * spark.executor.cores
data files size
64M
data files count
1
这是实际的公式
Formula
Bytes
DefaultMaxSplitBytes
= spark.sql.files.maxPartitionBytes
134,217,728
OpenCostInBytes
= spark.sql.files.openCostInBytes
4,194,304
DefaultParallelism
= spark.default.parallelism
8
TotalBytes
= DataBytes + (# files * OpenCostInBytes)
71,303,168
BytesPerCore
= TotalBytes / DefaultParallelism
8,912,896
MaxSplitBytes
= MIN(DefaultMaxSplitBytes, MAX(OpenCostInBytes, BytesPerCore))
8,912,896
Estimated number of partition
= TotalBytes / MaxSplitBytes
8
pyspark-sql 在读取 .csv 时会创建多少个分区?
我的理解是
number of partitions = math.ceil(file_size/spark.conf.get('spark.sql.files.maxPartitionBytes'))
在我的机器上:
spark.conf.get('spark.sql.files.maxPartitionBytes')
output:
'134217728b' #128MBs
但是,我没有观察到这种行为。我创建了一个占用磁盘 96 MB 的文件。我 运行 在本地模式下点火。我有一台 8 核笔记本电脑。我认为它应该读入 1 个分区。但是,该文件正在 8 个分区中读取。以下是我使用的代码库:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#creating a small DataFrame. This will occupy 96 MBs on disk
pd.DataFrame({'id':np.arange(10000000),'b':np.random.choice(['a','b','c','d'],size=(10000000,),p=[0.25,0.25,0.25,0.25])}).to_csv('df_s.csv',index=None)
sd=spark.read.csv('df_s.csv',schema="id int, b string")
sd.rdd.getNumPartitions()
output: 8
你能帮我理解为什么不管文件大小我都会看到 8 个分区吗?
实际的公式实际上比那复杂一点。检查下面的计算。你可以找到源代码here.
这是您的配置和文件
Spark Configuration | Value | Default |
---|---|---|
spark.sql.files.maxPartitionBytes | 128M | 128M |
spark.sql.files.openCostInBytes | 4M | 4M |
spark.executor.instances | 1 | local |
spark.executor.cores | 8 | your cores |
spark.default.parallelism | 8 | =spark.executor.instances * spark.executor.cores |
data files size | 64M | |
data files count | 1 |
这是实际的公式
Formula | Bytes | |
---|---|---|
DefaultMaxSplitBytes | = spark.sql.files.maxPartitionBytes | 134,217,728 |
OpenCostInBytes | = spark.sql.files.openCostInBytes | 4,194,304 |
DefaultParallelism | = spark.default.parallelism | 8 |
TotalBytes | = DataBytes + (# files * OpenCostInBytes) | 71,303,168 |
BytesPerCore | = TotalBytes / DefaultParallelism | 8,912,896 |
MaxSplitBytes | = MIN(DefaultMaxSplitBytes, MAX(OpenCostInBytes, BytesPerCore)) | 8,912,896 |
Estimated number of partition | = TotalBytes / MaxSplitBytes | 8 |