连接顶点时,我是否被迫使用 MEMORY_ONLY 缓存?
When joining vertices, am I forced to use MEMORY_ONLY caching?
查看 outerJoinVertices
的来源
我想知道这是错误还是功能
override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
(other: RDD[(VertexId, U)])
(updateF: (VertexId, VD, Option[U]) => VD2)
(implicit eq: VD =:= VD2 = null): Graph[VD2, ED] = {
// The implicit parameter eq will be populated by the compiler if VD and VD2 are equal, and left
// null if not
if (eq != null) {
vertices.cache() // <===== what if I wanted it serialized?
// updateF preserves type, so we can use incremental replication
val newVerts = vertices.leftJoin(other)(updateF).cache()
val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
.updateVertices(changedVerts)
new GraphImpl(newVerts, newReplicatedVertexView)
} else {
// updateF does not preserve type, so we must re-replicate all vertices
val newVerts = vertices.leftJoin(other)(updateF)
GraphImpl(newVerts, replicatedVertexView.edges)
}
}
问题
如果我的图/连接的顶点已经通过另一个 StorageLevel
(例如 MEMORY_ONLY_SER
)缓存 - 这是导致 org.apache.spark.graphx.impl.ShippableVertexPartitionOps ... WARN ShippableVertexPartitionOps: Joining two VertexPartitions with different indexes is slow.
的原因吗?
如果是这种情况,那么这是 Spark 中的一个错误(这是从 1.3.1 开始的)?找不到关于此的 JIRA 问题(但我没仔细看...)
为什么修复不像为该方法提供一个新的 StorageLevel 那样简单?
有什么解决方法? (我能想到的一个是用 vertices.join(otherVertices) 和 originalGraph.edges 或其他东西创建一个新图形......但感觉不对......
好吧,我认为这实际上不是错误。
查看 VertexRDD
的代码,它覆盖了缓存方法,并使用用于创建此顶点的原始 StorageLevel
。
override def cache(): this.type = {
partitionsRDD.persist(targetStorageLevel)
this
}
查看 outerJoinVertices
我想知道这是错误还是功能
override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
(other: RDD[(VertexId, U)])
(updateF: (VertexId, VD, Option[U]) => VD2)
(implicit eq: VD =:= VD2 = null): Graph[VD2, ED] = {
// The implicit parameter eq will be populated by the compiler if VD and VD2 are equal, and left
// null if not
if (eq != null) {
vertices.cache() // <===== what if I wanted it serialized?
// updateF preserves type, so we can use incremental replication
val newVerts = vertices.leftJoin(other)(updateF).cache()
val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
.updateVertices(changedVerts)
new GraphImpl(newVerts, newReplicatedVertexView)
} else {
// updateF does not preserve type, so we must re-replicate all vertices
val newVerts = vertices.leftJoin(other)(updateF)
GraphImpl(newVerts, replicatedVertexView.edges)
}
}
问题
如果我的图/连接的顶点已经通过另一个
StorageLevel
(例如MEMORY_ONLY_SER
)缓存 - 这是导致org.apache.spark.graphx.impl.ShippableVertexPartitionOps ... WARN ShippableVertexPartitionOps: Joining two VertexPartitions with different indexes is slow.
的原因吗?如果是这种情况,那么这是 Spark 中的一个错误(这是从 1.3.1 开始的)?找不到关于此的 JIRA 问题(但我没仔细看...)
为什么修复不像为该方法提供一个新的 StorageLevel 那样简单?
有什么解决方法? (我能想到的一个是用 vertices.join(otherVertices) 和 originalGraph.edges 或其他东西创建一个新图形......但感觉不对......
好吧,我认为这实际上不是错误。
查看 VertexRDD
的代码,它覆盖了缓存方法,并使用用于创建此顶点的原始 StorageLevel
。
override def cache(): this.type = {
partitionsRDD.persist(targetStorageLevel)
this
}