在广播变量中查找值
Finding values within broadcast variable
我想通过应用广播变量加入两个集合。我正在尝试实施 Spark: what's the best strategy for joining a 2-tuple-key RDD with single-key RDD?
的第一个建议
val emp_newBC = sc.broadcast(emp_new.collectAsMap())
val joined = emp.mapPartitions({ iter =>
val m = emp_newBC.value
for {
((t, w)) <- iter
if m.contains(t)
} yield ((w + '-' + m.get(t).get),1)
}, preservesPartitioning = true)
然而,如此处所述:我需要使用 collect() 而不是 collectAsMAp()。我试着调整我的代码如下:
val emp_newBC = sc.broadcast(emp_new.collect())
val joined = emp.mapPartitions({ iter =>
val m = emp_newBC.value
for {
((t, w)) <- iter
if m.contains(t)
amk = m.indexOf(t)
} yield ((w + '-' + emp_newBC.value(amk)),1) //yield ((t, w), (m.get(t).get)) //((w + '-' + m.get(t).get),1)
}, preservesPartitioning = true)
但是好像m.contains(t)没有反应。我该如何补救?
提前致谢。
这样的怎么样?
val emp_newBC = sc.broadcast(emp_new.groupByKey.collectAsMap)
val joined = emp.mapPartitions(iter => for {
(k, v1) <- iter
v2 <- emp_newBC.value.getOrElse(k, Iterable())
} yield (s"$v1-$v2", 1))
关于您的代码...据我了解 emp_new
是 RDD[(String, String)]
。当它被收集时,你会得到一个 Array[(String, String)]
。当你使用
((t, w)) <- iter
t
是一个 String
所以 m.contains(t)
总是 return false
.
我看到的另一个问题是 mapPartitions
里面的 preservesPartitioning = true
。有几种可能的情况:
emp
已分区,您希望 joined
也已分区。由于您将键从 t
更改为某些新值分区无法保留,因此必须重新分区结果 RDD
。如果你使用 preservesPartitioning = true
输出 RDD
将以错误的分区结束。
emp
已分区,但您不需要为 joined
分区。没有理由使用 preservesPartitioning
.
emp
未分区。设置 preservesPartitioning
无效。
我想通过应用广播变量加入两个集合。我正在尝试实施 Spark: what's the best strategy for joining a 2-tuple-key RDD with single-key RDD?
的第一个建议val emp_newBC = sc.broadcast(emp_new.collectAsMap())
val joined = emp.mapPartitions({ iter =>
val m = emp_newBC.value
for {
((t, w)) <- iter
if m.contains(t)
} yield ((w + '-' + m.get(t).get),1)
}, preservesPartitioning = true)
然而,如此处所述:
val emp_newBC = sc.broadcast(emp_new.collect())
val joined = emp.mapPartitions({ iter =>
val m = emp_newBC.value
for {
((t, w)) <- iter
if m.contains(t)
amk = m.indexOf(t)
} yield ((w + '-' + emp_newBC.value(amk)),1) //yield ((t, w), (m.get(t).get)) //((w + '-' + m.get(t).get),1)
}, preservesPartitioning = true)
但是好像m.contains(t)没有反应。我该如何补救?
提前致谢。
这样的怎么样?
val emp_newBC = sc.broadcast(emp_new.groupByKey.collectAsMap)
val joined = emp.mapPartitions(iter => for {
(k, v1) <- iter
v2 <- emp_newBC.value.getOrElse(k, Iterable())
} yield (s"$v1-$v2", 1))
关于您的代码...据我了解 emp_new
是 RDD[(String, String)]
。当它被收集时,你会得到一个 Array[(String, String)]
。当你使用
((t, w)) <- iter
t
是一个 String
所以 m.contains(t)
总是 return false
.
我看到的另一个问题是 mapPartitions
里面的 preservesPartitioning = true
。有几种可能的情况:
emp
已分区,您希望joined
也已分区。由于您将键从t
更改为某些新值分区无法保留,因此必须重新分区结果RDD
。如果你使用preservesPartitioning = true
输出RDD
将以错误的分区结束。emp
已分区,但您不需要为joined
分区。没有理由使用preservesPartitioning
.emp
未分区。设置preservesPartitioning
无效。