如何在 Spark GraphX 中正确使用广播变量?

How to use Broadcast variable correctly in Spark GraphX?

我使用 GraphX 处理图表。我使用 GraphLoader 加载它,并使用以下代码创建了一个包含每个节点的邻居的变量:

val all_neighbors: VertexRDD[Array[VertexId]] = graph.collectNeighborIds(EdgeDirection.Either).cache()

因为我经常需要节点邻居,所以我决定广播它们。当我使用此代码时出现错误:

val broadcastVar = sc.broadcast(all_neighbors)

但是当我使用这段代码时没有错误:

val broadcastVar = sc.broadcast(all_neighbors.collect())

使用collect()进行广播是否正确?

还有一个问题。我想将此广播变量更改为键值。这个代码对吗?

val nvalues = broadcastVar.value.toMap

上面的代码(我的意思是nvalues)是否广播到集群中的所有slave?我也应该广播 nvalues 吗??我对广泛的主题有点困惑。请帮我解决这个问题。

有两个问题:

is it right to use collect() for broadcasting??

all_neighbors 是 VertexRDD 类型,它本质上是一个 RDD。 RDD 中没有任何内容可以广播。 RDD 是一种数据结构,描述了对某些数据集的分布式计算。通过 RDD 的特性,您可以描述计算什么以及如何计算。它是一个抽象实体。你只能广播一个真实的值,但 RDD 只是一个值的容器,只有当执行者处理它们的数据时才可用。

引用自Broadcast Variables

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.

This means that explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.

这就是我们需要执行 collect RDD 持有的数据集的原因,该数据集将 RDD 转换为本地可用的集合,然后可以广播。

注意:当您执行collect操作时,数据会在驱动节点中累积,然后进行广播。所以如果driver节点中的space少了,就会抛出错误

does the above code(i means nvalues) broadcast to all slaves in cluster?? should I broadcast nvalues too??

这完全取决于您的用例。如果你只想使用 broadcastVar,那么只广播它,或者如果你想使用 nvalues,只广播 nvalues,否则你可以广播这两个值,但你需要小心内存限制。

如果有帮助请告诉我!!