在 Spark 中,如何读取用 bucketBy 写入的 parquet 文件,并保存分桶数据?
In Spark, how do you read parquet files that were written with bucketBy, and preserve the bucketing data?
在 Apache Spark 2.4.5 中,如何打开一组使用 bucketBy 和 saveAsTable 编写的 parquet 文件?
例如:
case class VeryVeryDeeplyNestedThing(
s: String,
nested1: OtherVeryDeeplyNestedThing
)
case class OtherVeryDeeplyNestedThing (
youGetTheIdeaNoOneWantsToHandWriteASqlStatementForThese: NestedMcNesty
)
List(VeryVeryDeeplyNestedThing(...)).toDS()
.write
.bucketBy(512, "s")
.option("path", "/tmp/output")
.format("parquet")
.saveAsTable("mytable")
现在 /tmp/output
中有一组 parquet 文件。将 /tmp/output 中的文件移动到 /tmp/newPlace,并开始一个全新的 spark 会话。
spark.read.parquet("/tmp/newPlace")
.whatGoesHere?
您需要做什么才能使用写入它们时使用的相同存储桶信息将它们读回?似乎这些信息本身并没有被烘焙到 parquet 文件中,或者这是怎么回事?
[编辑:根据@thebluephantom 添加了部分来自 https://kb.databricks.com/_static/notebooks/data/bucketing-example.html 的工作示例,我认为表明阅读实际上确实需要一些特殊的东西]
如果您像这样创建 parquet 文件:
scala> def base = spark.range(1, 160000, 1, 16).select($"id" as "key", rand(12) as "value")
base: org.apache.spark.sql.DataFrame
scala> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode
scala> base.write.format("parquet").bucketBy(16, "key").sortBy("value").option("path", "/tmp/example").mode(SaveMode.Overwrite).saveAsTable("bucketed")
scala> base.write.format("parquet").option("path", "/tmp/exampleunbucketed").mode(SaveMode.Overwrite).saveAsTable("unbucketed")
scala> val t2 = spark.table("bucketed")
t2: org.apache.spark.sql.DataFrame = [key: bigint, value: double]
scala> val t3 = spark.table("bucketed")
t3: org.apache.spark.sql.DataFrame = [key: bigint, value: double]
// This is joining two bucketed tables
scala> t3.join(t2, Seq("key")).explain()
== Physical Plan ==
*(2) Project [key#51L, value#52, value#58]
+- *(2) BroadcastHashJoin [key#51L], [key#57L], Inner, BuildRight
:- *(2) Project [key#51L, value#52]
: +- *(2) Filter isnotnull(key#51L)
: +- *(2) FileScan parquet default.bucketed[key#51L,value#52] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
+- *(1) Project [key#57L, value#58]
+- *(1) Filter isnotnull(key#57L)
+- *(1) FileScan parquet default.bucketed[key#57L,value#58] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16
两边都有一个FileScan parquet default.bucketed
。现在只需简单阅读镶木地板文件,并解释连接:
scala> val t4 = spark.read.parquet("/tmp/example")
t4: org.apache.spark.sql.DataFrame = [key: bigint, value: double]
scala> t3.join(t4, Seq("key")).explain()
== Physical Plan ==
*(2) Project [key#51L, value#52, value#64]
+- *(2) BroadcastHashJoin [key#51L], [key#63L], Inner, BuildRight
:- *(2) Project [key#51L, value#52]
: +- *(2) Filter isnotnull(key#51L)
: +- *(2) FileScan parquet default.bucketed[key#51L,value#52] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
+- *(1) Project [key#63L, value#64]
+- *(1) Filter isnotnull(key#63L)
+- *(1) FileScan parquet [key#63L,value#64] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>
t4 没有任何表明它已分桶的信息。
这重要吗?它还是桶装的吗?我误读了解释输出吗?还是我必须做些什么来确保 t4 使用存储桶?
当我们在写入数据时使用分桶或聚类时,它会将数据划分为多个文件。
例如:
id,name,city
1,a,CA
2,b,NYC
3,c,NYC
4,d,CA
#So after bucketing based on city two file will be created
id,name,city
1,a,CA
4,d,CA
and
id,name,city
2,b,NYC
3,c,NYC
所以当我们从新位置读取文件时,我们将能够读取整个数据。
当您想预测下推某些条件时,分桶会有所帮助,因为它会限制 spark 只读取特定文件。
希望它能回答。
你不知道。 bucketBy是一个基于table的API,就这么简单。
使用 bucket by 以便随后对 table 进行排序,并通过避免混洗使随后的 JOINs
更快。因此,通常用于 ETL 的临时、中间结果处理。
读取不需要向查询中添加任何特殊内容,但是 JOINed tables 必须同时被分桶并且具有相同数量的分桶和分区。看到这个优秀的post:https://kb.databricks.com/_static/notebooks/data/bucketing-example.html。此外,spark sql 洗牌分区必须等于桶数。
更新
在小数据的情况下,可能会发生广播散列连接,因此设置如下:
spark.conf.set("spark.sql.sources.bucketing.enabled", true)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
此外,我建议使用 spark.table
,而不是 spark.read.parquet...
bucketBy 只适用于 table api。参见 https://engineering.taboola.com/bucket-the-shuffle-out-of-here/
在 Apache Spark 2.4.5 中,如何打开一组使用 bucketBy 和 saveAsTable 编写的 parquet 文件?
例如:
case class VeryVeryDeeplyNestedThing(
s: String,
nested1: OtherVeryDeeplyNestedThing
)
case class OtherVeryDeeplyNestedThing (
youGetTheIdeaNoOneWantsToHandWriteASqlStatementForThese: NestedMcNesty
)
List(VeryVeryDeeplyNestedThing(...)).toDS()
.write
.bucketBy(512, "s")
.option("path", "/tmp/output")
.format("parquet")
.saveAsTable("mytable")
现在 /tmp/output
中有一组 parquet 文件。将 /tmp/output 中的文件移动到 /tmp/newPlace,并开始一个全新的 spark 会话。
spark.read.parquet("/tmp/newPlace")
.whatGoesHere?
您需要做什么才能使用写入它们时使用的相同存储桶信息将它们读回?似乎这些信息本身并没有被烘焙到 parquet 文件中,或者这是怎么回事?
[编辑:根据@thebluephantom 添加了部分来自 https://kb.databricks.com/_static/notebooks/data/bucketing-example.html 的工作示例,我认为表明阅读实际上确实需要一些特殊的东西]
如果您像这样创建 parquet 文件:
scala> def base = spark.range(1, 160000, 1, 16).select($"id" as "key", rand(12) as "value")
base: org.apache.spark.sql.DataFrame
scala> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode
scala> base.write.format("parquet").bucketBy(16, "key").sortBy("value").option("path", "/tmp/example").mode(SaveMode.Overwrite).saveAsTable("bucketed")
scala> base.write.format("parquet").option("path", "/tmp/exampleunbucketed").mode(SaveMode.Overwrite).saveAsTable("unbucketed")
scala> val t2 = spark.table("bucketed")
t2: org.apache.spark.sql.DataFrame = [key: bigint, value: double]
scala> val t3 = spark.table("bucketed")
t3: org.apache.spark.sql.DataFrame = [key: bigint, value: double]
// This is joining two bucketed tables
scala> t3.join(t2, Seq("key")).explain()
== Physical Plan ==
*(2) Project [key#51L, value#52, value#58]
+- *(2) BroadcastHashJoin [key#51L], [key#57L], Inner, BuildRight
:- *(2) Project [key#51L, value#52]
: +- *(2) Filter isnotnull(key#51L)
: +- *(2) FileScan parquet default.bucketed[key#51L,value#52] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
+- *(1) Project [key#57L, value#58]
+- *(1) Filter isnotnull(key#57L)
+- *(1) FileScan parquet default.bucketed[key#57L,value#58] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16
两边都有一个FileScan parquet default.bucketed
。现在只需简单阅读镶木地板文件,并解释连接:
scala> val t4 = spark.read.parquet("/tmp/example")
t4: org.apache.spark.sql.DataFrame = [key: bigint, value: double]
scala> t3.join(t4, Seq("key")).explain()
== Physical Plan ==
*(2) Project [key#51L, value#52, value#64]
+- *(2) BroadcastHashJoin [key#51L], [key#63L], Inner, BuildRight
:- *(2) Project [key#51L, value#52]
: +- *(2) Filter isnotnull(key#51L)
: +- *(2) FileScan parquet default.bucketed[key#51L,value#52] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
+- *(1) Project [key#63L, value#64]
+- *(1) Filter isnotnull(key#63L)
+- *(1) FileScan parquet [key#63L,value#64] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>
t4 没有任何表明它已分桶的信息。
这重要吗?它还是桶装的吗?我误读了解释输出吗?还是我必须做些什么来确保 t4 使用存储桶?
当我们在写入数据时使用分桶或聚类时,它会将数据划分为多个文件。 例如:
id,name,city
1,a,CA
2,b,NYC
3,c,NYC
4,d,CA
#So after bucketing based on city two file will be created
id,name,city
1,a,CA
4,d,CA
and
id,name,city
2,b,NYC
3,c,NYC
所以当我们从新位置读取文件时,我们将能够读取整个数据。
当您想预测下推某些条件时,分桶会有所帮助,因为它会限制 spark 只读取特定文件。
希望它能回答。
你不知道。 bucketBy是一个基于table的API,就这么简单。
使用 bucket by 以便随后对 table 进行排序,并通过避免混洗使随后的 JOINs
更快。因此,通常用于 ETL 的临时、中间结果处理。
读取不需要向查询中添加任何特殊内容,但是 JOINed tables 必须同时被分桶并且具有相同数量的分桶和分区。看到这个优秀的post:https://kb.databricks.com/_static/notebooks/data/bucketing-example.html。此外,spark sql 洗牌分区必须等于桶数。
更新
在小数据的情况下,可能会发生广播散列连接,因此设置如下:
spark.conf.set("spark.sql.sources.bucketing.enabled", true)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
此外,我建议使用 spark.table
,而不是 spark.read.parquet...
bucketBy 只适用于 table api。参见 https://engineering.taboola.com/bucket-the-shuffle-out-of-here/