使用 Spark 对 Parquet 文件进行计数操作
Count operation on Parquet Files using Spark
我在 HDFS 中有两组相同的 Parquet 格式数据。
一组按 col1
排序,另一组未排序。 sorted_table 约为 127 GB,而 unsorted_table 约为 117GB。
大小在这里无关紧要。
我 运行 使用 Spark SQL 进行了两次查询:
select col1, count(*) from sorted_table where col1 = someInt group by col1
select col1, count(*) from unsorted_table where col1 = someInt group by col1
我在 spark UI 上分析了这些查询,我发现 sorted_table 上的查询只读取了 127 MB 的数据,而 unsorted_table 上的查询读取了 35 GB 的数据来计算计数。
所以我的问题是:
- Spark如何通过读取较少的数据来计算计数?
- 为什么 sorted_table 上的作业读取的数据比
unsorted_table?
Parquet 文件存储在称为行组的块中。每个行组都可以有一些与每个 field/column 相关联的元数据,包括行数、最小值和最大值。由于您的数据已排序,Spark 可以根据这些范围完全跳过数据的大块。
使用 parquet_reader
,这是我的 Parquet 文件的摘录:
Column 2
, values: 35957, null values: 0, distinct values: 0
max: 17305, min: 17305
compression: SNAPPY, encodings: RLE PLAIN
uncompressed size: 143866, compressed size: 6800
这是一个 DATE
字段,所有值都相同,因此最大值和最小值相同。但是如果我正在寻找一个特定的日期范围,Spark 可以使用它来决定实际数据是否值得检查。
这里有关于行组的更多信息:https://parquet.apache.org/documentation/latest/
但我没有看到 min
和 max
列出......这可能是特定于实现的。
我在 HDFS 中有两组相同的 Parquet 格式数据。
一组按 col1
排序,另一组未排序。 sorted_table 约为 127 GB,而 unsorted_table 约为 117GB。
大小在这里无关紧要。
我 运行 使用 Spark SQL 进行了两次查询:
select col1, count(*) from sorted_table where col1 = someInt group by col1
select col1, count(*) from unsorted_table where col1 = someInt group by col1
我在 spark UI 上分析了这些查询,我发现 sorted_table 上的查询只读取了 127 MB 的数据,而 unsorted_table 上的查询读取了 35 GB 的数据来计算计数。
所以我的问题是:
- Spark如何通过读取较少的数据来计算计数?
- 为什么 sorted_table 上的作业读取的数据比 unsorted_table?
Parquet 文件存储在称为行组的块中。每个行组都可以有一些与每个 field/column 相关联的元数据,包括行数、最小值和最大值。由于您的数据已排序,Spark 可以根据这些范围完全跳过数据的大块。
使用 parquet_reader
,这是我的 Parquet 文件的摘录:
Column 2
, values: 35957, null values: 0, distinct values: 0
max: 17305, min: 17305
compression: SNAPPY, encodings: RLE PLAIN
uncompressed size: 143866, compressed size: 6800
这是一个 DATE
字段,所有值都相同,因此最大值和最小值相同。但是如果我正在寻找一个特定的日期范围,Spark 可以使用它来决定实际数据是否值得检查。
这里有关于行组的更多信息:https://parquet.apache.org/documentation/latest/
但我没有看到 min
和 max
列出......这可能是特定于实现的。