Spark 广播与加入

Spark broadcast vs join

我有一个很大的 RDD (rdd_1) 和它的过滤子集 (rdd_2)。我想在不同的领域加入 rdd_1 和 rdd_2。

假设记录的格式为 {'first_name':<>, 'last_name':<>}。我们想要找到与所有“jack”具有相同姓氏的所有名称。

names = sc.textfile(RAW_DATA)
jack = names.filter(lambda v: v['first_name'] == 'jack')

选项 1

jack_last_names = jack.map(operator.itergetter('last_name').distinct().collect()
last_names_bc = sc.broadcast(set(jack_last_names))
final = names.filter(lambda v:v['last_name'] in last_names_bc.value)

目前,我广播 rdd_2 并通过它过滤 rdd_1。麻烦的是,为了广播 rdd_2,我必须先在驱动程序上收集()它,这会导致驱动程序 运行 内存不足。

有没有一种方法可以广播 RDD 而无需先在驱动程序上收集()?

选项 2

final = jack.keyBy(operator.itemgetter('last_name').join(names.keyBy(operator.itemgetter('last_name')

我的另一个选择是 rdd_1.join(rdd_2),但是 rdd_1 太大了,无法随机播放。

当我们 运行 rdd_1.join(rdd_2) 同时对 rdd_1 和 rdd_2 进行哈希分区和洗牌时?

谢谢!

Is there a way to broadcast an RDD without first collect()ing it on the driver?

不,没有,即使有也不能解决您的问题。

  • 无法执行嵌套操作或转换
  • 如果你可以在没有收集的情况下创建一个本地广播变量,你会面临同样的问题,但在工人身上

When we run rdd_1.join(rdd_2) do both rdd_1 and rdd_2 get hash partitioned and shuffled?

从技术上讲,在 PySpark 中,它需要 union 后跟 groupByKey,因此这意味着所有数据都必须重新洗牌。

在实践中,我会简单地接受洗牌的成本。一般来说,不可能编写任何复杂的应用程序并完全避免改组。此外,它并不比 更昂贵,甚至不会通过复制将数据复制到分布式文件系统。