了解 Spark 创建的分区数

Understanding the number of partitions created by Spark

pyspark-sql 在读取 .csv 时会创建多少个分区?

我的理解是 number of partitions = math.ceil(file_size/spark.conf.get('spark.sql.files.maxPartitionBytes'))

在我的机器上:

spark.conf.get('spark.sql.files.maxPartitionBytes')
output: 
'134217728b' #128MBs

但是,我没有观察到这种行为。我创建了一个占用磁盘 96 MB 的文件。我 运行 在本地模式下点火。我有一台 8 核笔记本电脑。我认为它应该读入 1 个分区。但是,该文件正在 8 个分区中读取。以下是我使用的代码库:

import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#creating a small DataFrame. This will occupy 96 MBs on disk
pd.DataFrame({'id':np.arange(10000000),'b':np.random.choice(['a','b','c','d'],size=(10000000,),p=[0.25,0.25,0.25,0.25])}).to_csv('df_s.csv',index=None)
sd=spark.read.csv('df_s.csv',schema="id int, b string")
sd.rdd.getNumPartitions()
output: 8

你能帮我理解为什么不管文件大小我都会看到 8 个分区吗?

实际的公式实际上比那复杂一点。检查下面的计算。你可以找到源代码here.

这是您的配置和文件

Spark Configuration Value Default
spark.sql.files.maxPartitionBytes 128M 128M
spark.sql.files.openCostInBytes 4M 4M
spark.executor.instances 1 local
spark.executor.cores 8 your cores
spark.default.parallelism 8 =spark.executor.instances * spark.executor.cores
data files size 64M
data files count 1

这是实际的公式

Formula Bytes
DefaultMaxSplitBytes = spark.sql.files.maxPartitionBytes 134,217,728
OpenCostInBytes = spark.sql.files.openCostInBytes 4,194,304
DefaultParallelism = spark.default.parallelism 8
TotalBytes = DataBytes + (# files * OpenCostInBytes) 71,303,168
BytesPerCore = TotalBytes / DefaultParallelism 8,912,896
MaxSplitBytes = MIN(DefaultMaxSplitBytes, MAX(OpenCostInBytes, BytesPerCore)) 8,912,896
Estimated number of partition = TotalBytes / MaxSplitBytes 8