Spark 更新缓存数据集
Spark update cached dataset
我有一个排序的数据集,它根据数据集头部的值在一个循环内更新(过滤)。
如果我每 n(例如,50)个周期缓存一次数据集,我的性能就会很好。
但是,经过一定数量的循环后,缓存似乎不起作用,因为程序变慢了(我猜是因为分配给缓存的内存已满)。
我问的是是否以及如何在缓存中只维护更新的数据集,以便不填满内存并保持良好的性能。
请在下面找到我的代码示例:
dataset = dataset.sort(/* sort condition */)
dataset.cache()
var head = dataset.head(1)
var count = 0
while (head.nonEmpty) {
count +=1
/* custom operation with the head */
dataset = dataset.filter(/* filter condition based on the head of the dataset */
if (count % 50 == 0) {
dataset.cache()
}
head = dataset.head(1)
}
单靠 cache
无法帮助您。随着每次迭代沿袭和执行计划的增长,这不是单独缓存可以解决的问题。
你至少应该打破血统:
if (count % 50 == 0) {
dataset.cache()
dataset.checkpoint
}
虽然我个人也会将数据写入分布式存储并读回:
if (count % 50 == 0) {
dataset.write.parquet(s"/some/path/$count")
dataset = spark.read.parquet(s"/some/path/$count")
}
它可能无法接受,具体取决于您的部署,但在许多情况下其行为比缓存和检查点更可预测
尝试在缓存之前取消缓存数据集,这样您将从内存中删除数据集的旧副本并仅保留最新副本,避免内存中存在多个副本。以下是示例,但您已根据代码逻辑
将 dataset.unpersist() 保持在正确的位置
if (count % 50 == 0) {
dataset.cache()
}
dataset = dataset.filter(/* filter condition based on the head of the dataset */
if (count % 50 == 0) {
dataset.cache()
}
我有一个排序的数据集,它根据数据集头部的值在一个循环内更新(过滤)。
如果我每 n(例如,50)个周期缓存一次数据集,我的性能就会很好。
但是,经过一定数量的循环后,缓存似乎不起作用,因为程序变慢了(我猜是因为分配给缓存的内存已满)。
我问的是是否以及如何在缓存中只维护更新的数据集,以便不填满内存并保持良好的性能。 请在下面找到我的代码示例:
dataset = dataset.sort(/* sort condition */)
dataset.cache()
var head = dataset.head(1)
var count = 0
while (head.nonEmpty) {
count +=1
/* custom operation with the head */
dataset = dataset.filter(/* filter condition based on the head of the dataset */
if (count % 50 == 0) {
dataset.cache()
}
head = dataset.head(1)
}
cache
无法帮助您。随着每次迭代沿袭和执行计划的增长,这不是单独缓存可以解决的问题。
你至少应该打破血统:
if (count % 50 == 0) {
dataset.cache()
dataset.checkpoint
}
虽然我个人也会将数据写入分布式存储并读回:
if (count % 50 == 0) {
dataset.write.parquet(s"/some/path/$count")
dataset = spark.read.parquet(s"/some/path/$count")
}
它可能无法接受,具体取决于您的部署,但在许多情况下其行为比缓存和检查点更可预测
尝试在缓存之前取消缓存数据集,这样您将从内存中删除数据集的旧副本并仅保留最新副本,避免内存中存在多个副本。以下是示例,但您已根据代码逻辑
将 dataset.unpersist() 保持在正确的位置 if (count % 50 == 0) {
dataset.cache()
}
dataset = dataset.filter(/* filter condition based on the head of the dataset */
if (count % 50 == 0) {
dataset.cache()
}