在广播变量中查找值

Finding values within broadcast variable

我想通过应用广播变量加入两个集合。我正在尝试实施 Spark: what's the best strategy for joining a 2-tuple-key RDD with single-key RDD?

的第一个建议
val emp_newBC = sc.broadcast(emp_new.collectAsMap())
val joined = emp.mapPartitions({ iter =>
      val m = emp_newBC.value
      for {
        ((t, w)) <- iter
        if m.contains(t)
      } yield ((w + '-' + m.get(t).get),1)
    }, preservesPartitioning = true)

然而,如此处所述:我需要使用 collect() 而不是 collectAsMAp()。我试着调整我的代码如下:

val emp_newBC = sc.broadcast(emp_new.collect())
val joined = emp.mapPartitions({ iter =>
      val m = emp_newBC.value
      for {
        ((t, w)) <- iter
        if m.contains(t)
        amk = m.indexOf(t)
      } yield ((w + '-' + emp_newBC.value(amk)),1) //yield ((t, w), (m.get(t).get))   //((w + '-' + m.get(t).get),1)
    }, preservesPartitioning = true)

但是好像m.contains(t)没有反应。我该如何补救?

提前致谢。

这样的怎么样?

val emp_newBC = sc.broadcast(emp_new.groupByKey.collectAsMap)

val joined = emp.mapPartitions(iter => for {
  (k, v1) <- iter
  v2 <- emp_newBC.value.getOrElse(k, Iterable())
} yield (s"$v1-$v2", 1))

关于您的代码...据我了解 emp_newRDD[(String, String)]。当它被收集时,你会得到一个 Array[(String, String)]。当你使用

((t, w)) <- iter

t 是一个 String 所以 m.contains(t) 总是 return false.

我看到的另一个问题是 mapPartitions 里面的 preservesPartitioning = true。有几种可能的情况:

  1. emp 已分区,您希望 joined 也已分区。由于您将键从 t 更改为某些新值分区无法保留,因此必须重新分区结果 RDD。如果你使用 preservesPartitioning = true 输出 RDD 将以错误的分区结束。
  2. emp 已分区,但您不需要为 joined 分区。没有理由使用 preservesPartitioning.
  3. emp 未分区。设置 preservesPartitioning 无效。