分区未在简单的 SparkSQL 查询中被修剪
Partitions not being pruned in simple SparkSQL queries
我正在尝试从 SparkSQL table(S3 中的镶木地板)中高效地 select 单个分区。但是,我看到 Spark 打开 table 中的所有 parquet 文件的证据,而不仅仅是那些通过过滤器的文件。对于具有大量分区的 tables,这使得即使是小查询也很昂贵。
这是一个说明性的例子。我使用 SparkSQL 和 Hive 元存储在 S3 上创建了一个简单的分区 table:
# Make some data
df = pandas.DataFrame({'pk': ['a']*5+['b']*5+['c']*5,
'k': ['a', 'e', 'i', 'o', 'u']*3,
'v': range(15)})
# Convert to a SparkSQL DataFrame
sdf = hiveContext.createDataFrame(df)
# And save it
sdf.write.partitionBy('pk').saveAsTable('dataset',
format='parquet',
path='s3a://bucket/dataset')
在随后的会话中,我想 select 这个 table 的一个子集:
dataset = hiveContext.table('dataset')
filtered_dataset = dataset.filter(dataset.pk == 'b')
print filtered_dataset.toPandas()
在随后打印的日志中,我看到修剪应该正在发生:
15/07/05 02:39:39 INFO DataSourceStrategy: Selected 1 partitions out of 3, pruned -200.0% partitions.
但后来我看到所有分区都打开了 parquet 文件:
15/07/05 02:39:39 INFO S3AFileSystem: Reopening dataset/pk=a/part-r-00001.gz.parquet to seek to new offset 508
15/07/05 02:39:39 INFO S3AFileSystem: Actually opening file dataset/pk=a/part-r-00001.gz.parquet at pos 508
15/07/05 02:39:39 INFO S3AFileSystem: Reopening dataset/pk=b/part-r-00001.gz.parquet to seek to new offset 509
15/07/05 02:39:39 INFO S3AFileSystem: Actually opening file dataset/pk=b/part-r-00001.gz.parquet at pos 509
15/07/05 02:39:39 INFO S3AFileSystem: Reopening dataset/_common_metadata to seek to new offset 262
15/07/05 02:39:39 INFO S3AFileSystem: Actually opening file dataset/_common_metadata at pos 262
15/07/05 02:39:39 INFO S3AFileSystem: Reopening dataset/pk=c/part-r-00001.gz.parquet to seek to new offset 509
15/07/05 02:39:39 INFO S3AFileSystem: Actually opening file dataset/pk=c/part-r-00001.gz.parquet at pos 509
15/07/05 02:39:39 INFO S3AFileSystem: Reopening dataset/pk=b/part-r-00001.gz.parquet to seek to new offset -365
15/07/05 02:39:39 INFO S3AFileSystem: Actually opening file dataset/pk=b/part-r-00001.gz.parquet at pos 152
15/07/05 02:39:39 INFO S3AFileSystem: Reopening dataset/pk=a/part-r-00001.gz.parquet to seek to new offset -365
15/07/05 02:39:39 INFO S3AFileSystem: Actually opening file dataset/pk=a/part-r-00001.gz.parquet at pos 151
15/07/05 02:39:39 INFO S3AFileSystem: Reopening dataset/_common_metadata to seek to new offset -266
15/07/05 02:39:39 INFO S3AFileSystem: Actually opening file dataset/_common_metadata at pos 4
15/07/05 02:39:39 INFO S3AFileSystem: Reopening dataset/pk=c/part-r-00001.gz.parquet to seek to new offset -365
15/07/05 02:39:39 INFO S3AFileSystem: Actually opening file dataset/pk=c/part-r-00001.gz.parquet at pos 152
只有三个分区,这不是问题---但是有数千个,它会导致明显的延迟。为什么打开所有这些不相关的文件?
查看 spark.sql.parquet.filterPushdown
,默认设置为 false
,因为 Spark 使用的 Parquet 版本中存在一些错误。
1.3/1.4可能可以使用,查看官方documentation.
我认为这在 Spark 1.5 中已修复。
我正在尝试从 SparkSQL table(S3 中的镶木地板)中高效地 select 单个分区。但是,我看到 Spark 打开 table 中的所有 parquet 文件的证据,而不仅仅是那些通过过滤器的文件。对于具有大量分区的 tables,这使得即使是小查询也很昂贵。
这是一个说明性的例子。我使用 SparkSQL 和 Hive 元存储在 S3 上创建了一个简单的分区 table:
# Make some data
df = pandas.DataFrame({'pk': ['a']*5+['b']*5+['c']*5,
'k': ['a', 'e', 'i', 'o', 'u']*3,
'v': range(15)})
# Convert to a SparkSQL DataFrame
sdf = hiveContext.createDataFrame(df)
# And save it
sdf.write.partitionBy('pk').saveAsTable('dataset',
format='parquet',
path='s3a://bucket/dataset')
在随后的会话中,我想 select 这个 table 的一个子集:
dataset = hiveContext.table('dataset')
filtered_dataset = dataset.filter(dataset.pk == 'b')
print filtered_dataset.toPandas()
在随后打印的日志中,我看到修剪应该正在发生:
15/07/05 02:39:39 INFO DataSourceStrategy: Selected 1 partitions out of 3, pruned -200.0% partitions.
但后来我看到所有分区都打开了 parquet 文件:
15/07/05 02:39:39 INFO S3AFileSystem: Reopening dataset/pk=a/part-r-00001.gz.parquet to seek to new offset 508
15/07/05 02:39:39 INFO S3AFileSystem: Actually opening file dataset/pk=a/part-r-00001.gz.parquet at pos 508
15/07/05 02:39:39 INFO S3AFileSystem: Reopening dataset/pk=b/part-r-00001.gz.parquet to seek to new offset 509
15/07/05 02:39:39 INFO S3AFileSystem: Actually opening file dataset/pk=b/part-r-00001.gz.parquet at pos 509
15/07/05 02:39:39 INFO S3AFileSystem: Reopening dataset/_common_metadata to seek to new offset 262
15/07/05 02:39:39 INFO S3AFileSystem: Actually opening file dataset/_common_metadata at pos 262
15/07/05 02:39:39 INFO S3AFileSystem: Reopening dataset/pk=c/part-r-00001.gz.parquet to seek to new offset 509
15/07/05 02:39:39 INFO S3AFileSystem: Actually opening file dataset/pk=c/part-r-00001.gz.parquet at pos 509
15/07/05 02:39:39 INFO S3AFileSystem: Reopening dataset/pk=b/part-r-00001.gz.parquet to seek to new offset -365
15/07/05 02:39:39 INFO S3AFileSystem: Actually opening file dataset/pk=b/part-r-00001.gz.parquet at pos 152
15/07/05 02:39:39 INFO S3AFileSystem: Reopening dataset/pk=a/part-r-00001.gz.parquet to seek to new offset -365
15/07/05 02:39:39 INFO S3AFileSystem: Actually opening file dataset/pk=a/part-r-00001.gz.parquet at pos 151
15/07/05 02:39:39 INFO S3AFileSystem: Reopening dataset/_common_metadata to seek to new offset -266
15/07/05 02:39:39 INFO S3AFileSystem: Actually opening file dataset/_common_metadata at pos 4
15/07/05 02:39:39 INFO S3AFileSystem: Reopening dataset/pk=c/part-r-00001.gz.parquet to seek to new offset -365
15/07/05 02:39:39 INFO S3AFileSystem: Actually opening file dataset/pk=c/part-r-00001.gz.parquet at pos 152
只有三个分区,这不是问题---但是有数千个,它会导致明显的延迟。为什么打开所有这些不相关的文件?
查看 spark.sql.parquet.filterPushdown
,默认设置为 false
,因为 Spark 使用的 Parquet 版本中存在一些错误。
1.3/1.4可能可以使用,查看官方documentation.
我认为这在 Spark 1.5 中已修复。