使用 monoid 进行 spark 分布式计算的示例

Example of usage of a monoid for distributed computation with spark

我有用户爱好数据(RDD[Map[String, Int]]) like:

("food" -> 3, "music" -> 1),
("food" -> 2),
("game" -> 5, "twitch" -> 3, "food" -> 3)

我想计算它们的统计数据,并将统计数据表示为 Map[String, Array[Int]] 而数组大小为 5,例如:

("food" -> Array(0, 1, 2, 0, 0),
 "music" -> Array(1, 0, 0, 0, 0),
 "game" -> Array(0, 0, 0, 0, 1),
 "twitch" -> Array(0, 0, 1, 0 ,0))

foldLeft似乎是正确的解决方案,但是RDD不能用,而且数据太大转换成List/Array才能用 foldLeft,我怎么能做这个工作?

诀窍是用 class 替换示例中的数组,其中包含您想要的某些数据的统计信息,并且可以与同一统计信息的另一个实例(涵盖其他部分数据)提供整体数据的统计。

例如,如果您有一个涵盖数据 3、3、2 和 5 的统计数据,我认为它看起来像 (0, 1, 2, 0, 1),如果您有另一个涵盖数据 3,4 的实例, 4 它看起来像 (0, 0, 1, 2,0)。现在您所要做的就是定义一个 + 操作,让您合并 (0, 1, 2, 0, 1) + (0, 0, 1, 2, 0) = (0,1,3,2,1),覆盖数据 3,3,2,5 和 3,4,4。

让我们这样做,然后调用 class StatMonoid:

case class StatMonoid(flags: Seq[Int] = Seq(0,0,0,0,0)) {
    def + (other: StatMonoid) = 
        new StatMonoid( (0 to 4).map{idx => flags(idx) + other.flags(idx)})
}

这个class包含5个计数器的序列,并定义一个+操作,让它与其他计数器组合。

我们还需要一个方便的方法来构建它,这可以是 StatMonoid 中的构造函数,在伴随对象中,或者只是一个简单的方法,如您所愿:

def stat(value: Int): StatMonoid = value match {
    case 1 => new StatMonoid(Seq(1,0,0,0,0))
    case 2 => new StatMonoid(Seq(0,1,0,0,0))
    case 3 => new StatMonoid(Seq(0,0,1,0,0))
    case 4 => new StatMonoid(Seq(0,0,0,1,0))
    case 5 => new StatMonoid(Seq(0,0,0,0,1))
    case _ => throw new RuntimeException("illegal init value: $value")
}

这使我们能够轻松计算涵盖单个数据的统计实例,例如:

scala> stat(4)
res25: StatMonoid = StatMonoid(List(0, 0, 0, 1, 0))

它还允许我们通过简单地添加它们来将它们组合在一起:

scala> stat(1) + stat(2) + stat(2) + stat(5) + stat(5) + stat(5)
res18: StatMonoid = StatMonoid(Vector(1, 2, 0, 0, 3))

现在将其应用于您的示例,假设我们有您提到的数据作为 Map 的 RDD:

val rdd =  sc.parallelize(List(Map("food" -> 3, "music" -> 1), Map("food" -> 2), Map("game" -> 5, "twitch" -> 3, "food" -> 3)))

我们需要做的就是找到每种食物的统计数据,将数据展平以获得 ("foodId" -> id) 元组,将每个 id 转换为 [=21= 的实例] 以上,最后将每种食物组合在一起:

import org.apache.spark.rdd.PairRDDFunctions
rdd.flatMap(_.toList).mapValue(stat).reduceByKey(_ + _).collect

产生:

res24: Array[(String, StatMonoid)] = Array((game,StatMonoid(List(0, 0, 0, 0, 1))), (twitch,StatMonoid(List(0, 0, 1, 0, 0))), (music,StatMonoid(List(1, 0, 0, 0, 0))), (food,StatMonoid(Vector(0, 1, 2, 0, 0))))

现在,对于旁白,如果你想知道为什么我称 class StateMonoid 只是因为......它 一个幺半群: D,还有一个非常常见和方便的,叫做 product 。简而言之,幺半群只是可以以关联方式相互组合的东西,它们在 Spark 中开发时非常常见,因为它们自然地定义了可以在分布式从属服务器上并行执行的操作,并聚集在一起形成最终结果。