如何为将来的排序合并连接保留排序的镶木地板表?

How to persist sorted parquet tables for future sort merge joins?

我想将一个大的 sorted table 保存到 S3 上的 Parquet,然后读入并使用 Sorted Merge Join 策略将其加入到另一个大的排序 table。

问题是:即使我事先在连接键上对这些 table 进行了排序,但一旦我将它们持久化到 Parquet,它们似乎就丢失了有关它们排序的信息。无论如何,有没有向 Spark 暗示我下次读入它们时不需要使用它们?

我一直在 Spark 1.5 上尝试这个,我不断得到 SQL EXPLAIN 看起来像这样的计划:

[== Physical Plan ==]
[TungstenProject [pos#28400,workf...#28399]]
[ SortMergeJoin [CHROM#28403,pos#28400], [CHROM#28399,pos#28332]]
[  TungstenSort [CHROM#28403 ASC,pos#28400 ASC], false, 0]
[   TungstenExchange hashpartitioning(CHROM#28403,pos#28400)]
[    ConvertToUnsafe]
[     Scan ParquetRelation[file:/....sorted.parquet][pos#284....8424]]
[  TungstenSort [CHROM#28399 ASC,pos#28332 ASC], false, 0]
[   TungstenExchange hashpartitioning(CHROM#28399,pos#28332)]
[    ConvertToUnsafe]
[     Scan ParquetRelation[file:....exploded_sorted.parquet][pos#2.....399]]

你可以在那里看到额外的 TungstenExchange 和 TungstenSort 阶段,即使这个连接是在两个 table 上,它们在保存到 Parquet 之前按连接键 orderBy 排序。

看起来像这样is coming in Spark 2.0 along with support for bucketing

遗憾的是,Spark-2.0 尚不支持使用分桶写入 S3。我昨天试了Spark-2.0-priview

val NUMBER_OF_BUCKETS = 20
rdd.toDF.write.mode(SaveMode.Overwrite)
        .bucketBy(NUMBER_OF_BUCKETS,"data_frame_key")
        .partitionBy("day")
        .save("s3://XXXXX")

并收到此错误消息:

java.lang.IllegalArgumentException: Currently we don't support writing bucketed data to this data source.
    at org.apache.spark.sql.DataFrameWriter.assertNotBucketed(DataFrameWriter.scala:462)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)