Apache Spark:缓存和分区

Apache Spark: cache and partitions

情况如下:我有一个不断增长的数据集合,我想在 Hadoop 集群中使用 RDD 处理这些数据。

这是一个简短的例子:

val elementA = (1, Seq(2, 3))
val elementB = (2, Seq(1, 3))
val elementC = (3, Seq(1, 2))

val testRDD = sc.parallelize(Seq(elementA, elementB, elementC)).
    map(x => (x._1, x._2)).setName("testRDD").cache()

val elementD = (4, Seq(1, 3))
val elementD1 = (1, Seq(4))
val elementD2 = (3, Seq(4))

val testAdd = sc.parallelize(Seq(elementD, elementD1, elementD2)).
    map(x => (x._1, x._2)).setName("testAdd")

val testResult = testRDD.cogroup(testAdd).mapValues(x => (x._1 ++ x._2).flatten)

结果将是这样的(元素的顺序可以变化):

(1, List(2, 3, 4))
(2, List(1, 3))
(3, List(1, 2, 4))
(4, List(1, 3))

这是我的目标:

  1. 我想 .cache() 我的 RDD 在集群内存中。
  2. 我希望能够向现有 RDD 添加新元素。

这是我的结论:

  1. RDD 中的每个分区单独且完整地缓存(例如,我有一个包含 100 个元素和 4 个分区的集合,我调用了 .cache().collect()cache().first(),在第一种情况下得到了 4 个缓存分区,并且1 在第二种情况下)。
  2. testRDD.cogroup(testAdd) 的结果是新的 RDD,可以再次缓存,如果我们尝试使用 var testRDD 并调用 testRDD = testRDD.cogroup(testAdd),我们将丢失 link到缓存数据。
  3. 我知道,RDD 最适合批处理应用程序,我这里有这个:每个新元素的 Seq() 将根据另一个元素的属性计算。

Is there any way to modify current RDD without removing all of it's elements from cache?

我想做一种临时存储,并在临时存储达到一定限制后将临时存储与当前存储合并...

RDD 是不可变的,因此您不能向它们添加新元素。但是,您可以通过将原始 RDD 与新元素合并来创建新的 RDD,这与您对 testResult RDD 所做的类似。

如果你想为新的 RDD 使用相同的变量和更新,你可以为那个 RDD 使用 var 而不是 val。例如

var testRDD = sc.parallelize(...) val testAdd = sc.parallelize(...) testRDD = testRDD.union(testAdd) testRDD.cache()

这将创建一个连接两个原始 RDD 的沿袭。如果您在 testRDD 上多次调用 union,这可能会导致问题。要解决这个问题,您可以在 testRDD 联合多次后调用 checkpoint,比如每 10 次更新一次。您还可以考虑在检查点时在 testRDD 上调用 repartion。

所有添加到 testRDD 的元素都应该使用这种技术保留在缓存中。