为什么聚合的 Spark Parquet 文件比原始文件大?

Why are Spark Parquet files for an aggregate larger than the original?

我正在尝试为最终用户创建一个聚合文件,以避免让他们处理具有更大文件的多个源。为此,我: A) 遍历所有源文件夹,去除最常请求的 12 个字段,在这些结果位于同一位置的新位置分离出 parquet 文件。 B) 我尝试返回在步骤 A 中创建的文件,并通过按 12 个字段分组以将其减少为每个唯一组合的摘要行来重新聚合它们。

我发现步骤 A 减少了负载 5:1(大约 250 gigs 变成 48.5 gigs)。但是,步骤 B 没有进一步减少,而是比步骤 A 增加了 50%。但是,我的计数匹配。

这是使用 Spark 1.5.2
我的代码仅修改为将字段名称替换为 field1...field12 以使其更具可读性,下面是我注意到的结果。

虽然我不一定期望另一个 5:1 减少,但我不知道我在为具有相同模式的更少行增加存储方面做错了什么。任何人都可以帮助我理解我做错了什么?

谢谢!

//for each eventName found in separate source folders, do the following:
//spit out one row with key fields from the original dataset for quicker availability to clients 
//results in a 5:1 reduction in size
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, cast(1 as bigint) as rCount from table"
sqlContext.sql(sqlCommand).coalesce(20).write.parquet("<aws folder>" + dt + "/" + eventName + "/")
//results in over 700 files with a total of  16,969,050,506 rows consuming 48.65 gigs of storage space in S3, compressed 

//after all events are processed, aggregate the results
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, sum(rCount) as rCount from results group by field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12"
//Use a wildcard to search all sub-folders created above
sqlContext.read.parquet("<aws folder>" + dt + "/*/").registerTempTable("results")
sqlContext.sql(sqlStatement).coalesce(20).saveAsParquetFile("<a new aws folder>" + dt + "/")
//This results in  3,295,206,761 rows with an aggregate value of 16,969,050,506 for rCount but consumes 79.32 gigs of storage space in S3, compressed

//The parquet schemas created (both tables match):
 |-- field1: string (nullable = true) (10 characters)
 |-- field2: string (nullable = true) (15 characters)
 |-- field3: string (nullable = true) (50 characters max)
 |-- field4: string (nullable = true) (10 characters)
 |-- field5: string (nullable = true) (10 characters)
 |-- field6: string (nullable = true) (10 characters)
 |-- field7: string (nullable = true) (16 characters)
 |-- field8: string (nullable = true) (10 characters)
 |-- field9 string (nullable = true)  (15 characters)
 |-- field10: string (nullable = true)(20 characters)
 |-- field11: string (nullable = true)(14 characters)
 |-- field12: string (nullable = true)(14 characters)
 |-- rCount: long (nullable = true)   
 |-- dt: string (nullable = true)

一般来说,像 Parquet 这样的列式存储格式在数据分布(数据组织)和各个列的基数方面非常敏感。数据越有条理,基数越低,存储效率越高。

作为您应用的聚合,必须对数据进行混洗。当您检查执行计划时,您会看到它正在使用散列分区程序。这意味着聚合后的分发效率可能低于原始数据。同时 sum 可以减少行数但增加 rCount 列的基数。

您可以尝试不同的工具来纠正这个问题,但并非所有工具都适用于 Spark 1.5.2:

  • 按基数较低的列(由于完全随机化而非常昂贵)或 sortWithinPartitions.
  • 对完整数据集进行排序
  • 使用 DataFrameWriterpartitionBy 方法使用低基数列对数据进行分区。
  • 使用 DataFrameWriter (Spark 2.0.0+) 的 bucketBysortBy 方法使用分桶和本地排序改进数据分布。