在 Spark 中重用连接的数据框
Reusing a joined dataframe in Spark
我 运行 在本地使用 HDFS 和 Spark 并试图了解 Spark 持久性的工作原理。我的 objective 是将连接的数据集存储在内存中,然后 运行 动态查询它。但是,我的查询似乎是重做连接而不是简单地扫描持久的预连接数据集。
我通过从 HDFS 加载两个 CSV 文件,创建并保存了两个数据帧,比方说 df1 和 df2。我将两个数据帧的连接保存在内存中:
val result = df1.join(df2, "USERNAME")
result.persist()
result.count()
然后我在结果之上定义了一些操作:
val result2 = result.select("FOO", "BAR").groupBy("FOO").sum("BAR")
result2.show()
'result2' 不会依赖持久化的结果并自行重做连接。以下是 result 和 result2 的物理计划:
== Physical Plan for result ==
InMemoryColumnarTableScan [...], (InMemoryRelation [...], true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject [...]), None)
== Physical Plan for result2 ==
TungstenAggregate(key=[FOO#2], functions=[(sum(cast(BAR#10 as double)),mode=Final,isDistinct=false)], output=[FOO#2,sum(BAR)#837])
TungstenExchange hashpartitioning(FOO#2)
TungstenAggregate(key=[FOO#2], functions=[(sum(cast(BAR#10 as double)),mode=Partial,isDistinct=false)], output=[FOO#2,currentSum#1311])
InMemoryColumnarTableScan [FOO#2,BAR#10], (InMemoryRelation [...], true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject [...]), None)
我会天真地假设,因为连接已经完成并在内存中分区,第二个操作将只包含对每个分区的聚合操作。从头开始重做连接应该更昂贵。我是假设错误还是做错了什么?另外,这是保留连接数据集供以后查询的正确模式吗?
编辑:郑重声明,在我调低随机分区数后,第二个查询的性能提高了很多。默认情况下,spark.sql.shuffle.partitions 设置为 200。只需在我的本地实例上将其设置为 1 即可显着提高性能。
如果我们查看计划,我们会发现 Spark 实际上正在使用缓存数据而不是重做连接。从下往上开始:
这是 Spark 从您的缓存中读取数据:
InMemoryColumnarTableScan [FOO#2,BAR#10], (InMemoryRelation ...
这是 Spark 在每个分区中按 FOO 聚合 BAR - 查找模式=部分
TungstenAggregate(key=[FOO#2], functions=[(sum(cast(BAR#10 as double)),mode=Partial ...
这是 Spark 对上一步每个分区的数据进行洗牌:
TungstenExchange hashpartitioning(FOO#2)
这是 Spark 聚合打乱后的分区总和 - 寻找 mode=Final
TungstenAggregate(key=[FOO#2], functions=[(sum(cast(BAR#10 as double)),mode=Final ...
阅读这些计划有点痛苦,所以如果您可以访问 Spark UI(我认为是 1.5+)的 SQL 选项卡,我建议您改用它。
我 运行 在本地使用 HDFS 和 Spark 并试图了解 Spark 持久性的工作原理。我的 objective 是将连接的数据集存储在内存中,然后 运行 动态查询它。但是,我的查询似乎是重做连接而不是简单地扫描持久的预连接数据集。
我通过从 HDFS 加载两个 CSV 文件,创建并保存了两个数据帧,比方说 df1 和 df2。我将两个数据帧的连接保存在内存中:
val result = df1.join(df2, "USERNAME")
result.persist()
result.count()
然后我在结果之上定义了一些操作:
val result2 = result.select("FOO", "BAR").groupBy("FOO").sum("BAR")
result2.show()
'result2' 不会依赖持久化的结果并自行重做连接。以下是 result 和 result2 的物理计划:
== Physical Plan for result ==
InMemoryColumnarTableScan [...], (InMemoryRelation [...], true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject [...]), None)
== Physical Plan for result2 ==
TungstenAggregate(key=[FOO#2], functions=[(sum(cast(BAR#10 as double)),mode=Final,isDistinct=false)], output=[FOO#2,sum(BAR)#837])
TungstenExchange hashpartitioning(FOO#2)
TungstenAggregate(key=[FOO#2], functions=[(sum(cast(BAR#10 as double)),mode=Partial,isDistinct=false)], output=[FOO#2,currentSum#1311])
InMemoryColumnarTableScan [FOO#2,BAR#10], (InMemoryRelation [...], true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject [...]), None)
我会天真地假设,因为连接已经完成并在内存中分区,第二个操作将只包含对每个分区的聚合操作。从头开始重做连接应该更昂贵。我是假设错误还是做错了什么?另外,这是保留连接数据集供以后查询的正确模式吗?
编辑:郑重声明,在我调低随机分区数后,第二个查询的性能提高了很多。默认情况下,spark.sql.shuffle.partitions 设置为 200。只需在我的本地实例上将其设置为 1 即可显着提高性能。
如果我们查看计划,我们会发现 Spark 实际上正在使用缓存数据而不是重做连接。从下往上开始:
这是 Spark 从您的缓存中读取数据:
InMemoryColumnarTableScan [FOO#2,BAR#10], (InMemoryRelation ...
这是 Spark 在每个分区中按 FOO 聚合 BAR - 查找模式=部分
TungstenAggregate(key=[FOO#2], functions=[(sum(cast(BAR#10 as double)),mode=Partial ...
这是 Spark 对上一步每个分区的数据进行洗牌:
TungstenExchange hashpartitioning(FOO#2)
这是 Spark 聚合打乱后的分区总和 - 寻找 mode=Final
TungstenAggregate(key=[FOO#2], functions=[(sum(cast(BAR#10 as double)),mode=Final ...
阅读这些计划有点痛苦,所以如果您可以访问 Spark UI(我认为是 1.5+)的 SQL 选项卡,我建议您改用它。