优化从 s3 存储桶中的分区镶木地板文件读取

optimizing reading from partitioned parquet files in s3 bucket

我有一个 parquet 格式的大型数据集(大小约为 1TB),分为 2 个层次结构:CLASSDATE 只有7个类。但是日期从 2020-01-01 开始不断增加。 我的数据先按 CLASS 分区,然后 DATE

所以像这样:

CLASS1---DATE 1
      ---DATE 2
      ---  .
      ---  .
      ---  .
      ---DATE N

CLASS2---DATE 1
      ---DATE 2
      ---  .
      ---  .
      ---  .
      ---DATE N

我通过 CLASS 在 for 循环中加载我的数据。如果我加载整个 parquet 文件,YARN 会终止作业,因为它会使内存实例过载。但是自从我在建模中进行百分位数计算以来,我一直在加载。此方法大约需要 23 小时才能完成。

但是,如果我重新分区使得我只有 CLASS 分区,则该工作大约需要 10 个小时。 有太多的子分区会减慢火花执行器的工作吗? 我将分区层次结构保持为 CLASS -> DATE 只是因为我需要每天按 DATE 追加新数据。 如果只有 1 个分区更有效,那么我必须每天在加载新数据后重新分区到 CLASS 分区。 有人可以解释为什么单个分区工作得更快吗?如果是这样,通过附加而不重新分区整个数据集来每天对数据进行分区的最佳方法是什么?

谢谢

编辑: 我在文件结构上使用 for 循环按 CLASS 分区循环,如下所示:

fs = s3fs.S3FileSystem(anon=False)    
inpath="s3://bucket/file.parquet/"

Dirs= fs.ls(inpath)
for paths in Dirs:
    customPath='s3://' + uvapath + '/'
    class=uvapath.split('=')[1]
    df=spark.read.parquet(customPath)
    outpath="s3://bucket/Output_" + class + ".parquet"
#Perform calculations
df.write.mode('overwrite').parquet(outpath)

加载的 df 将包含 CLASS=1 的所有日期。然后我将文件输出为每个 CLASS 的单独镶木地板文件,这样我就有 7 个镶木地板文件:

Output_1.parquet
Output_2.parquet
Output_3.parquet
Output_4.parquet
Output_5.parquet
Output_6.parquet
Output_7.parquet

然后我将 7 个 parquet 合并为一个 parquet 不是问题,因为生成的 parquet 文件要小得多。

我有包含年、月和 ID 三列的分区数据。文件夹路径层级为

year=2020/month=08/id=1/*.parquet
year=2020/month=08/id=2/*.parquet
year=2020/month=08/id=3/*.parquet
...
year=2020/month=09/id=1/*.parquet
year=2020/month=09/id=2/*.parquet
year=2020/month=09/id=3/*.parquet

并且我可以通过加载根路径来读取 DataFrame。

val df = spark.read.parquet("s3://mybucket/")

然后,分区列自动添加到DataFrame。现在,您可以按照

的方式过滤分区列的数据
val df_filtered = df.filter("year = '2020' and month = '09'")

并使用 df_filtered 做一些事情,然后 spark 将只使用分区数据!


对于你的重复处理,可以使用spark的fair scheduler。使用以下代码将 fair.xml 文件添加到项目的 src/main/resources 中,

<?xml version="1.0"?>

<allocations>
    <pool name="fair">
        <schedulingMode>FAIR</schedulingMode>
        <weight>10</weight>
        <minShare>0</minShare>
    </pool>
</allocations>

并在创建 spark 会话后设置 spark 配置。

spark.sparkContext.setLocalProperty("spark.scheduler.mode", "FAIR")
spark.sparkContext.setLocalProperty("spark.scheduler.allocation.file", getClass.getResource("/fair.xml").getPath)
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "fair")

然后你就可以并行完成你的工作了。您可能希望并行化作业取决于 CLASS,因此

val classes = (1 to 7).par
val date = '2020-09-25'

classes foreach { case i =>

    val df_filtered = df.filter(s"CLASS == '$i' and DATE = '$date'")
    
    // Do your job

}

代码将同时使用不同的 CLASS 值。