为什么 checkpoint() 比 persist() 快
Why is checkpoint() faster than persist()
我有一个使用 DataFrame 进行计算的代码。
+------------------------------------+------------+----------+----+------+
| Name| Role|Experience|Born|Salary|
+------------------------------------+------------+----------+----+------+
| 瓮䇮滴ୗ┦附䬌┊ᇕ鈃디蠾综䛿ꩁ翨찘... | охранник| 16|1960|108111|
| 擲鱫뫉ܞ琱폤縭ᘵ훧귚۔♧䋐滜컑... | повар| 14|1977| 40934|
| 㑶뇨ꄳ壚ᗜ㙣샾ꎓ㌸翧쉟梒靻駌푤... | геодезист| 29|1997| 27335|
| ࣆ᠘䬆䨎⑁烸ᯠણ ᭯몇믊ຮ쭧닕㟣紕... | не охранн. | 4|1999 | 30000|
... ... ...
我尝试以不同的方式缓存 table。
def processDataFrame(mode: String): Long = {
val t0 = System.currentTimeMillis
val topDf = df.filter(col("Salary").>(50000))
val cacheDf = mode match {
case "CACHE" => topDf.cache()
case "PERSIST" => topDf.persist()
case "CHECKPOINT" => topDf.checkpoint()
case "CHECKPOINT_NON_EAGER" => topDf.checkpoint(false)
case _ => topDf
}
val roleList = cacheDf.groupBy("Role")
.count()
.orderBy("Role")
.collect()
val bornList = cacheDf.groupBy("Born")
.count()
.orderBy(col("Born").desc)
.collect()
val t1 = System.currentTimeMillis()
t1-t0 // time result
}
我得到了让我思考的结果。
为什么 checkpoint(false) 比 persist() 更有效?
毕竟,检查点需要时间来序列化对象并将它们写入磁盘。
P.S。我在 GitHub 上的小项目:https://github.com/MinorityMeaning/CacheCheckpoint
我没有检查你的项目,但我认为它值得进行一次小的讨论。
我希望您清楚地指出您没有 运行 一次此代码,而是对几个 运行 进行平均,以确定此特定数据集的性能。 (不是效率)Spark Clusters 可能有很多噪音,导致作业之间的差异,并且平均几个 运行s 确实需要确定性能。有几个性能因素(数据 locality/Spark 执行程序、资源争用等)
我认为您不能说“高效”,因为这些函数实际上执行两种不同的功能。他们也会因为他们所做的事情而在不同的情况下表现不同。有时您会想要检查点,以 t运行 分类数据沿袭或在计算量非常大的操作之后。有时重新计算谱系实际上比从磁盘写入和读取更便宜。
简单的规则是,如果您要多次使用此 table/DataFrame/DataSet 将其缓存在内存中。(不是磁盘)
一旦您在未完成的工作中遇到问题,请考虑可以调整的内容。从代码 perspective/query 的角度来看。
之后...
当且仅当这与复杂作业的失败有关并且您看到执行程序失败时,才考虑使用磁盘来保存数据。这应该始终是故障排除的后续步骤,而不是故障排除的第一步。
我有一个使用 DataFrame 进行计算的代码。
+------------------------------------+------------+----------+----+------+
| Name| Role|Experience|Born|Salary|
+------------------------------------+------------+----------+----+------+
| 瓮䇮滴ୗ┦附䬌┊ᇕ鈃디蠾综䛿ꩁ翨찘... | охранник| 16|1960|108111|
| 擲鱫뫉ܞ琱폤縭ᘵ훧귚۔♧䋐滜컑... | повар| 14|1977| 40934|
| 㑶뇨ꄳ壚ᗜ㙣샾ꎓ㌸翧쉟梒靻駌푤... | геодезист| 29|1997| 27335|
| ࣆ᠘䬆䨎⑁烸ᯠણ ᭯몇믊ຮ쭧닕㟣紕... | не охранн. | 4|1999 | 30000|
... ... ...
我尝试以不同的方式缓存 table。
def processDataFrame(mode: String): Long = {
val t0 = System.currentTimeMillis
val topDf = df.filter(col("Salary").>(50000))
val cacheDf = mode match {
case "CACHE" => topDf.cache()
case "PERSIST" => topDf.persist()
case "CHECKPOINT" => topDf.checkpoint()
case "CHECKPOINT_NON_EAGER" => topDf.checkpoint(false)
case _ => topDf
}
val roleList = cacheDf.groupBy("Role")
.count()
.orderBy("Role")
.collect()
val bornList = cacheDf.groupBy("Born")
.count()
.orderBy(col("Born").desc)
.collect()
val t1 = System.currentTimeMillis()
t1-t0 // time result
}
我得到了让我思考的结果。
为什么 checkpoint(false) 比 persist() 更有效? 毕竟,检查点需要时间来序列化对象并将它们写入磁盘。
P.S。我在 GitHub 上的小项目:https://github.com/MinorityMeaning/CacheCheckpoint
我没有检查你的项目,但我认为它值得进行一次小的讨论。 我希望您清楚地指出您没有 运行 一次此代码,而是对几个 运行 进行平均,以确定此特定数据集的性能。 (不是效率)Spark Clusters 可能有很多噪音,导致作业之间的差异,并且平均几个 运行s 确实需要确定性能。有几个性能因素(数据 locality/Spark 执行程序、资源争用等)
我认为您不能说“高效”,因为这些函数实际上执行两种不同的功能。他们也会因为他们所做的事情而在不同的情况下表现不同。有时您会想要检查点,以 t运行 分类数据沿袭或在计算量非常大的操作之后。有时重新计算谱系实际上比从磁盘写入和读取更便宜。
简单的规则是,如果您要多次使用此 table/DataFrame/DataSet 将其缓存在内存中。(不是磁盘)
一旦您在未完成的工作中遇到问题,请考虑可以调整的内容。从代码 perspective/query 的角度来看。
之后...
当且仅当这与复杂作业的失败有关并且您看到执行程序失败时,才考虑使用磁盘来保存数据。这应该始终是故障排除的后续步骤,而不是故障排除的第一步。