Spark Parquet 统计(min/max)集成

Spark Parquet Statistics(min/max) integration

我一直在研究 Spark 如何在 Parquet 中存储统计信息 (min/max) 以及它如何使用信息进行查询优化。 我有几个问题。 首先搭建:Spark 2.1.0,下面搭建一个1000行的Dataframe,列为long类型和string类型。 不过,它们按不同的列排序。

scala> spark.sql("select id, cast(id as string) text from range(1000)").sort("id").write.parquet("/secret/spark21-sortById")
scala> spark.sql("select id, cast(id as string) text from range(1000)").sort("Text").write.parquet("/secret/spark21-sortByText")

我向 parquet-tools 添加了一些代码来打印统计信息并检查生成的 parquet 文件:

hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta /secret/spark21-sortById/part-00000-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet 
file:        file:/secret/spark21-sortById/part-00000-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet 
creator:     parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 
extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"text","type":"string","nullable":false,"metadata":{}}]} 

file schema: spark_schema 
--------------------------------------------------------------------------------
id:          REQUIRED INT64 R:0 D:0
text:        REQUIRED BINARY O:UTF8 R:0 D:0

row group 1: RC:5 TS:133 OFFSET:4 
--------------------------------------------------------------------------------
id:           INT64 SNAPPY DO:0 FPO:4 SZ:71/81/1.14 VC:5 ENC:PLAIN,BIT_PACKED STA:[min: 0, max: 4, num_nulls: 0]
text:         BINARY SNAPPY DO:0 FPO:75 SZ:53/52/0.98 VC:5 ENC:PLAIN,BIT_PACKED

hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta /secret/spark21-sortByText/part-00000-3d7eac74-5ca0-44a0-b8a6-d67cc38a2bde.snappy.parquet 
file:        file:/secret/spark21-sortByText/part-00000-3d7eac74-5ca0-44a0-b8a6-d67cc38a2bde.snappy.parquet 
creator:     parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 
extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"text","type":"string","nullable":false,"metadata":{}}]} 

file schema: spark_schema 
--------------------------------------------------------------------------------
id:          REQUIRED INT64 R:0 D:0
text:        REQUIRED BINARY O:UTF8 R:0 D:0

row group 1: RC:5 TS:140 OFFSET:4 
--------------------------------------------------------------------------------
id:           INT64 SNAPPY DO:0 FPO:4 SZ:71/81/1.14 VC:5 ENC:PLAIN,BIT_PACKED STA:[min: 0, max: 101, num_nulls: 0]
text:         BINARY SNAPPY DO:0 FPO:75 SZ:60/59/0.98 VC:5 ENC:PLAIN,BIT_PACKED

所以问题是为什么 Spark,特别是 2.1.0,只为数字列生成 min/max,而不是字符串(BINARY)字段,即使字符串字段包含在排序中?也许我错过了配置?

第二个问题,我如何确认 Spark 正在使用 min/max?

scala> sc.setLogLevel("INFO")
scala> spark.sql("select * from parquet.`/secret/spark21-sortById` where id=4").show

我有很多这样的行:

17/01/17 09:23:35 INFO FilterCompat: Filtering using predicate: and(noteq(id, null), eq(id, 4))
17/01/17 09:23:35 INFO FileScanRDD: Reading File path: file:///secret/spark21-sortById/part-00000-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet, range: 0-558, partition values: [empty row]
...
17/01/17 09:23:35 INFO FilterCompat: Filtering using predicate: and(noteq(id, null), eq(id, 4))
17/01/17 09:23:35 INFO FileScanRDD: Reading File path: file:///secret/spark21-sortById/part-00193-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet, range: 0-574, partition values: [empty row]
...

问题是 Spark 似乎正在扫描每个文件,即使从 min/max,Spark 应该能够确定只有 part-00000 具有相关数据。或者我读错了,Spark 正在跳过文件?也许 Spark 只能使用分区值进行数据跳过?

对于第一个问题,我认为这是一个定义问题(字符串的 min/max 是什么?词法排序?)但无论如何,据我所知,目前只有 spark 的 parquet索引数字。

关于第二个问题,我相信如果你看得更深一点,你会发现spark本身并没有加载文件。相反,它正在读取元数据,因此它知道是否要读取一个块。所以基本上它是将谓词推到文件(块)级别。

PARQUET-686 进行了更改,以在适当的时候故意忽略二进制字段的统计信息。您可以通过将 parquet.strings.signed-min-max.enabled 设置为 true 来覆盖此行为。

设置该配置后,您可以使用 parquet-tools 在二进制字段中读取 min/max。

更多详情见my another Whosebug question

Spark-2.4.0版本已解决此问题。在这里,他们将 parquet 版本从 1.8.2 升级到 1.10.0。

[SPARK-23972] 将 Parquet 从 1.8.2 更新到 1.10.0

有了这些所有的列类型,无论它们是Int/String/Decimal都会包含min/max统计信息。