Scala + Spark collections 交互

Scala + Spark collections interactions

我正在我的小项目中工作,该项目使用图形作为主要结构。图由具有以下结构的顶点组成:

class SWVertex[T: ClassTag](
   val id: Long, 
   val data: T, 
   var neighbors: Vector[Long] = Vector.empty[Long], 
   val timestamp: Timestamp = new Timestamp(System.currentTimeMillis())
) extends Serializable { 
   def addNeighbor(neighbor: Long): Unit = {
      if (neighbor >= 0) { neighbors = neighbors :+ neighbor }
   }
}

备注:

  1. 会有很多顶点,我想可能会超过MAX_INT
  2. 每个顶点都有一个可变的邻居数组(它们只是另一个顶点的 ID)。
  3. 图中有特殊的添加顶点功能,使用BFS算法在图中选择最佳顶点连接新顶点-修改现有顶点和添加顶点的邻居数组。

我决定使用 Apache Spark 和 Scala 来处理和浏览我的图表,但我仍然存在一些误解:我知道,RDD 是一个并行数据集,我从 main collection 使用 parallelize() 方法,我发现修改源代码 collection 也会影响创建的 RDD。我用这段代码找到了这个:

val newVertex1 = new SWVertex[String](1, "test1")
val newVertex2 = new SWVertex[String](2, "test2")
var vertexData = Seq(newVertex1, newVertex2)

val testRDD1 = sc.parallelize(vertexData, vertexData.length)

testRDD1.collect().foreach(
   f => println("| ID: " + f.id + ", data: " + f.data + ", neighbors: "
   + f.neighbors.mkString(", "))
)

// The result is:
// | ID: 1, data: test1, neighbors: 
// | ID: 2, data: test2, neighbors: 


// Calling simple procedure, that uses `addNeighbor` on both parameters
makeFriends(vertexData(0), vertexData(1))

testRDD1.collect().foreach(
   f => println("| ID: " + f.id + ", data: " + f.data + ", neighbors: "
   + f.neighbors.mkString(", "))
)

// Now the result is:
// | ID: 1, data: test1, neighbors: 2
// | ID: 2, data: test2, neighbors: 1

,但我没有找到使用 RDD 方法做同样事情的方法(老实说,由于 RDD 的不可变性,我不确定这是否可能)。在这种情况下,问题是:

Is there any way to deal with such big amount of data, keeping the ability to access to the random vertices for modifying their neighbors lists and continuous appending of new vertices?

我认为解决方案必须使用某种 Vector 数据结构,在这种情况下我还有另一个问题:

Is it possible to store Scala structures in cluster memory?

P.S。我打算至少使用 Spark 来处理 BFS 搜索,但我会很高兴听到任何其他建议。

P.P.S。我读过 .view 创建 "lazy" collection 转换的方法,但仍然不知道如何使用它...

更新 1:就我正在阅读 Scala Cookbook 而言,我认为选择 Vector 将是最佳选择,因为在我的案例中使用图形意味着大量随机访问顶点 aka 图形元素并附加新顶点,但仍然 - 我不确定对如此大量的顶点使用 Vector 不会导致 OutOfMemoryException

更新 2:在上面的测试中,我发现内存发生了一些有趣的事情。这是交易(请记住,我正在使用 single-node Spark 集群):

// Test were performed using these lines of code:
val runtime = Runtime.getRuntime
var usedMemory = runtime.totalMemory - runtime.freeMemory

// In the beginning of my work, before creating vertices and collection:
usedMemory = 191066456 bytes // ~182 MB, 1st run 
usedMemory = 173991072 bytes // ~166 MB, 2nd run
// After creating collection with two vertices:
usedMemory = 191066456 bytes // ~182 MB, 1st run
usedMemory = 173991072 bytes // ~166 MB, 2nd run
// After creating testRDD1
usedMemory = 191066552 bytes // ~182 MB, 1st run 
usedMemory = 173991168 bytes // ~166 MB, 2nd run
// After performing first testRDD1.collect() function
usedMemory = 212618296 bytes // ~203 MB, 1st run 
usedMemory = 200733808 bytes // ~191 MB, 2nd run
// After calling makeFriends on source collection
usedMemory = 212618296 bytes // ~203 MB, 1st run 
usedMemory = 200733808 bytes // ~191 MB, 2nd run
// After calling testRDD1.collect() for modified collection
usedMemory = 216645128 bytes // ~207 MB, 1st run 
usedMemory = 203955264 bytes // ~195 MB, 2nd run

我知道这个测试量太低,无法确定我的结论,但我注意到:

  1. 创建 collection 时没有任何反应。
  2. 在此示例上创建 RDD 后,分配了 96 个字节,可能用于存储分区数据或其他内容。
  3. 我调用.collect()方法时分配了最多的内存,因为我基本上将所有数据收集到一个节点,而且,可能是因为single-node安装了Spark,我得到了两倍数据副本(此处不确定),占用了大约 23 MB 的内存。
  4. 有趣的时刻发生在修改邻居的数组之后,这需要额外的 4 MB 内存来存储它们。

让我尝试在这里解决不同的问题:

RDD is a parallel dataset, which I'm making from main collection using parallelize() method and I've discovered, that modifying source collection will take affect on created RDD as well.

RDD 是并行的,分布式 数据集。 parallelize 允许您获取本地集合并将其分发到集群中。您观察到的当前行为是,当改变底层对象时,RDD 表示也会发生改变,这只是因为程序当前在 1 个节点中 运行。在集群中,这种行为不可能

不变性是分布式计算的关键 'vertically':在同一处理器的多个内核上或 'horizontally':在集群中的多台机器上。

I didn't found the way to update the graph structure using RDD methods

要实现这一点,您需要根据分布式集合重新考虑图形结构。在当前的 OO 模型中,每个 Vertex 都包含自己的相邻顶点列表,并且需要对对象进行变异才能构建图形。 我们需要使顶点不可变,方法是仅使用它们的属性创建它们并将关系外部化为边列表。简而言之,这就是 GraphX 所做的。您的 Edge 看起来像:

case class Vertex[T: ClassTag](
   val id: Long, 
   val data: T, 
   val timestamp: Timestamp = new Timestamp(System.currentTimeMillis())
)

然后我们可以构建边的集合:

val Edges:RDD[(Long, Long)] // (Source Vertex Id, Dest Vertex Id)

然后,给定:

val usr1 = Vertex(1, "SuppieRK")
val usr2 = Vertex(2, "maasg")
val usr3 = Vertex(3, "graphy")
val usr4 = Vertex(4, "spark")

和一些初始关系:

val edgeSeq = Seq((1,2), (2,3))

以及这种关系的RDD:

val relations = sparkContext.parallelize(edgeSeq)

然后添加新关系将意味着创建新边:

val newRelations = sparkContext.parallelize(Seq((1,4),(2,4),(3,4))

union-将这些集合组合在一起。

val allRel = relations.union(newRelations)

这就是 "addFriend" 的实现方式,但我们可能会从某个地方读取该数据。此方法不用于对 Edges 集合进行逐一添加。您正在使用 Spark,因为要考虑的数据集非常大,您需要能够在多台机器上分配计算。

如果集合适合一个节点,我会坚持使用 "standard" Scala 表示和算法。