Spark 更新缓存数据集

Spark update cached dataset

我有一个排序的数据集,它根据数据集头部的值在一个循环内更新(过滤)。

如果我每 n(例如,50)个周期缓存一次数据集,我的性能就会很好。

但是,经过一定数量的循环后,缓存似乎不起作用,因为程序变慢了(我猜是因为分配给缓存的内存已满)。

我问的是是否以及如何在缓存中只维护更新的数据集,以便不填满内存并保持良好的性能。 请在下面找到我的代码示例:

dataset = dataset.sort(/* sort condition */)
dataset.cache()
var head = dataset.head(1)
var count = 0
while (head.nonEmpty) {
  count +=1
  /* custom operation with the head */
  dataset = dataset.filter(/* filter condition based on the head of the dataset */
  if (count % 50 == 0) {
    dataset.cache()
  }
  head = dataset.head(1)
}
单靠

cache 无法帮助您。随着每次迭代沿袭和执行计划的增长,这不是单独缓存可以解决的问题。

你至少应该打破血统:

if (count % 50 == 0) {
  dataset.cache()
  dataset.checkpoint
}

虽然我个人也会将数据写入分布式存储并读回:

if (count % 50 == 0) {
  dataset.write.parquet(s"/some/path/$count")
  dataset = spark.read.parquet(s"/some/path/$count")
}

它可能无法接受,具体取决于您的部署,但在许多情况下其行为比缓存和检查点更可预测

尝试在缓存之前取消缓存数据集,这样您将从内存中删除数据集的旧副本并仅保留最新副本,避免内存中存在多个副本。以下是示例,但您已根据代码逻辑

将 dataset.unpersist() 保持在正确的位置
  if (count % 50 == 0) {
    dataset.cache()
  }

  dataset = dataset.filter(/* filter condition based on the head of the dataset */

  if (count % 50 == 0) {
    dataset.cache()
  }