Apache Spark 方法返回一个 RDD(带尾递归)

Apache Spark Method returning an RDD (with Tail Recursion)

RDD 具有沿袭性,因此在对其执行操作之前不存在;所以,如果我有一个对 RDD 执行大量转换的方法和 returns 一个转换后的 RDD 那么我实际上返回了什么? 在某个操作需要该 RDD 之前,我是否不返回任何内容?如果我在方法中缓存了一个 RDD,它会持久存在缓存中吗?我想我知道这个问题的答案:当在返回的 RDD 上调用操作时,方法只会是 运行?但我可能是错的。

这个问题的扩展是: 如果我有一个尾递归方法,它接受一个 RDD 作为参数,returns 一个 RDD,但我在方法中缓存 RDD:

def method(myRDD : RDD) : RDD = {
   ...
   anRDD.cache
   if(true) return someRDD
   method(someRDD) // tailrec
}

然后,当发生尾递归时,它会覆盖之前缓存的 RDD anRDD 还是两者都保留?我想两者都会坚持。当我使用的数据集只有 63mb 大时,我将数据溢出到磁盘。而且我认为这可能与尾递归方法有关。

RDD 谱系构建为链接在一起的 RDD 对象实例图,其中谱系中的每个节点都有对其依赖项的引用。在最简单的链形式中,您可以将其视为链表:

hadoopRDD(location) <-depends- filteredRDD(f:A->Boolean) <-depends- mappedRDD(f:A->B)

您可以在基本 RDD 构造函数中欣赏这一点:

/** Construct an RDD with just a one-to-one dependency on one parent */
  def this(@transient oneParent: RDD[_]) =
    this(oneParent.context , List(new OneToOneDependency(oneParent))) 

言归正传:我们可以用递归的方式构建链表,也可以构建RDD lineage。作用于 RDD 的递归函数的结果将是一个定义良好的 RDD。

将需要一个动作来安排该谱系的执行,并将具体化它所代表的计算,就像人们可以 "walk" 创建链表一样。

考虑这个(相当做作,我必须承认)示例:

def isPrime(n:Int):Boolean = {
    (n == 2) || (!( n % 2 ==0) && !((3 to math.sqrt(n).ceil.toInt) exists (x => n % x == 0)))
}

def recPrimeFilter(rdd:RDD[Int], i:Int):RDD[Int] = 
if (i<=1) rdd else if (isPrime(i)) recPrimeFilter(rdd.filter(x=> x!=i), i-1) else (recPrimeFilter(rdd.map(x=>x+i), i-1))

当应用于整数 RDD 时,我们可以观察到素数位置的交错过滤器和映射结​​果的谱系:

val rdd = sc.parallelize(1 to 100)
val res = weirdPrimeFilter(rdd,15)
scala> res.toDebugString
res3: String = 
(8) FilteredRDD[54] at filter at <console>:18 []
 |  FilteredRDD[53] at filter at <console>:18 []
 |  MappedRDD[52] at map at <console>:18 []
 |  FilteredRDD[51] at filter at <console>:18 []
 |  MappedRDD[50] at map at <console>:18 []
 |  FilteredRDD[49] at filter at <console>:18 []
 |  MappedRDD[48] at map at <console>:18 []
 |  MappedRDD[47] at map at <console>:18 []
 |  MappedRDD[46] at map at <console>:18 []
 |  FilteredRDD[45] at filter at <console>:18 []
 |  MappedRDD[44] at map at <console>:18 []
 |  FilteredRDD[43] at filter at <console>:18 []
 |  MappedRDD[42] at map at <console>:18 []
 |  MappedRDD[41] at map at <console>:18 []
 |  ParallelCollectionRDD[33] at parallelize at <console>:13 []

'cache' 打破了沿袭,使 RDD 在缓存到 "remember" 的位置第一次经过那里时它的内容,以便沿袭中更远的所有相关 RDD 可以重用它缓存数据。 在线性 RDD 谱系的基本情况下,它根本不会产生任何影响,因为每个节点只会被访问一次。

在这种情况下,如果递归 RDD 构造过程创建一个图形或树状结构,其中在许多不同的 'leaf' 节点调用操作,则缓存可能有意义。