spark graphx 多边类型
spark graphx multiple edge types
我最近开始使用 spark。目前我正在测试具有不同顶点和边类型的二分图。
根据我在 graphx 中所做的研究,要具有不同的边和一些具有属性的边,我需要对边进行子类化。
这是代码片段:
scala> trait VertexProperty
defined trait VertexProperty
scala> case class paperProperty(val paperid: Long, val papername: String, val doi: String, val keywords: String) extends VertexProperty
defined class paperProperty
scala> case class authorProperty(val authorid: Long, val authorname: String) extends VertexProperty
defined class authorProperty
scala> val docsVertces: RDD[(VertexId, VertexProperty)] = docs.rdd.map(x => (x(0).asInstanceOf[VertexId],paperProperty(x(0).asInstanceOf[VertexId],x(1).asInstanceOf[String],x(2).asInstanceOf[String],x(3).asInstanceOf[String])))
docsVertces: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, VertexProperty)] = MapPartitionsRDD[23] at map at <console>:47
scala> val authorVertces: RDD[(VertexId, VertexProperty)] = authors.rdd.map(x => (x(0).asInstanceOf[VertexId],authorProperty(x(0).asInstanceOf[Long],x(1).asInstanceOf[String])))
authorVertces: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, VertexProperty)] = MapPartitionsRDD[24] at map at <console>:41
scala> val vertices = VertexRDD(docsVertces ++ authorVertces)
vertices: org.apache.spark.graphx.VertexRDD[VertexProperty] = VertexRDDImpl[28] at RDD at VertexRDD.scala:57
scala>
但是我的边缘失败了。
scala> class EdgeProperty()
defined class EdgeProperty
scala> case class authorEdgeProperty( val doccount: Long) extends EdgeProperty()
defined class authorEdgeProperty
scala> case class citeEdgeProperty() extends EdgeProperty()
defined class citeEdgeProperty
scala> // edge using subclass will not work we need to have one consistent superclass
scala> val docauthoredges = docauthor.map(x => Edge(x(0).asInstanceOf[VertexId],x(1).asInstanceOf[VertexId], authorEdgeProperty(x(1).asInstanceOf[Long])))
docauthoredges: org.apache.spark.sql.Dataset[org.apache.spark.graphx.Edge[authorEdgeProperty]] = [srcId: bigint, dstId: bigint ... 1 more field]
scala> val docciteedges = doccites.map(x => Edge(x(0).asInstanceOf[VertexId],x(1).asInstanceOf[VertexId], citeEdgeProperty()))
docciteedges: org.apache.spark.sql.Dataset[org.apache.spark.graphx.Edge[citeEdgeProperty]] = [srcId: bigint, dstId: bigint ... 1 more field]
scala> docauthoredges.unionAll(docciteedges)
<console>:52: error: type mismatch;
found : org.apache.spark.sql.Dataset[org.apache.spark.graphx.Edge[citeEdgeProperty]]
required: org.apache.spark.sql.Dataset[org.apache.spark.graphx.Edge[authorEdgeProperty]]
docauthoredges.unionAll(docciteedges)
^
scala>
我试图将优势投射到他们的超类并收到以下消息:
scala> val docauthoredges = docauthor.map(x => Edge(x(0).asInstanceOf[VertexId],x(1).asInstanceOf[VertexId], authorEdgeProperty(x(1).asInstanceOf[Long]).asInstanceOf[EdgeProperty]))
java.lang.UnsupportedOperationException: No Encoder found for EdgeProperty
- field (class: "EdgeProperty", name: "attr")
- root class: "org.apache.spark.graphx.Edge"
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:598)
...
任何帮助将不胜感激
你的问题有点徒劳,因为 GraphX 不支持 Datasets
并且边和顶点都应作为 RDDs
传递,但为了论证:
- 你遇到了第一个异常,因为 Spark 中的分布式数据结构是不变的。不要使用
asInstanceOf
。只需明确使用类型注释即可。
- 您得到第二个例外,因为
Datasets
进一步受到 Encoders
用法的限制。 Dataset
中的所有对象都必须使用相同的 Encoder
,在这种情况下,只有使用二进制编码器才有可能,用户定义的 class. 无法隐式访问该编码器
结合这两部分:
import org.apache.spark.sql.{Dataset, Encoders}
sealed trait EdgeProperty
case class AuthorEdgeProperty(val doccount: Long) extends EdgeProperty
case class CiteEdgeProperty() extends EdgeProperty
val docauthoredges: Dataset[EdgeProperty] = spark.range(10)
.map(AuthorEdgeProperty(_): EdgeProperty)(Encoders.kryo[EdgeProperty])
val docciteedges: Dataset[EdgeProperty] = spark.range(5)
.map(_ => CiteEdgeProperty(): EdgeProperty)(Encoders.kryo[EdgeProperty])
val edges: Dataset[EdgeProperty] = docauthoredges.union(docciteedges)
转换为 RDD
以使其在 GraphX 中可用:
edges.rdd
我最近开始使用 spark。目前我正在测试具有不同顶点和边类型的二分图。
根据我在 graphx 中所做的研究,要具有不同的边和一些具有属性的边,我需要对边进行子类化。
这是代码片段:
scala> trait VertexProperty
defined trait VertexProperty
scala> case class paperProperty(val paperid: Long, val papername: String, val doi: String, val keywords: String) extends VertexProperty
defined class paperProperty
scala> case class authorProperty(val authorid: Long, val authorname: String) extends VertexProperty
defined class authorProperty
scala> val docsVertces: RDD[(VertexId, VertexProperty)] = docs.rdd.map(x => (x(0).asInstanceOf[VertexId],paperProperty(x(0).asInstanceOf[VertexId],x(1).asInstanceOf[String],x(2).asInstanceOf[String],x(3).asInstanceOf[String])))
docsVertces: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, VertexProperty)] = MapPartitionsRDD[23] at map at <console>:47
scala> val authorVertces: RDD[(VertexId, VertexProperty)] = authors.rdd.map(x => (x(0).asInstanceOf[VertexId],authorProperty(x(0).asInstanceOf[Long],x(1).asInstanceOf[String])))
authorVertces: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, VertexProperty)] = MapPartitionsRDD[24] at map at <console>:41
scala> val vertices = VertexRDD(docsVertces ++ authorVertces)
vertices: org.apache.spark.graphx.VertexRDD[VertexProperty] = VertexRDDImpl[28] at RDD at VertexRDD.scala:57
scala>
但是我的边缘失败了。
scala> class EdgeProperty()
defined class EdgeProperty
scala> case class authorEdgeProperty( val doccount: Long) extends EdgeProperty()
defined class authorEdgeProperty
scala> case class citeEdgeProperty() extends EdgeProperty()
defined class citeEdgeProperty
scala> // edge using subclass will not work we need to have one consistent superclass
scala> val docauthoredges = docauthor.map(x => Edge(x(0).asInstanceOf[VertexId],x(1).asInstanceOf[VertexId], authorEdgeProperty(x(1).asInstanceOf[Long])))
docauthoredges: org.apache.spark.sql.Dataset[org.apache.spark.graphx.Edge[authorEdgeProperty]] = [srcId: bigint, dstId: bigint ... 1 more field]
scala> val docciteedges = doccites.map(x => Edge(x(0).asInstanceOf[VertexId],x(1).asInstanceOf[VertexId], citeEdgeProperty()))
docciteedges: org.apache.spark.sql.Dataset[org.apache.spark.graphx.Edge[citeEdgeProperty]] = [srcId: bigint, dstId: bigint ... 1 more field]
scala> docauthoredges.unionAll(docciteedges)
<console>:52: error: type mismatch;
found : org.apache.spark.sql.Dataset[org.apache.spark.graphx.Edge[citeEdgeProperty]]
required: org.apache.spark.sql.Dataset[org.apache.spark.graphx.Edge[authorEdgeProperty]]
docauthoredges.unionAll(docciteedges)
^
scala>
我试图将优势投射到他们的超类并收到以下消息:
scala> val docauthoredges = docauthor.map(x => Edge(x(0).asInstanceOf[VertexId],x(1).asInstanceOf[VertexId], authorEdgeProperty(x(1).asInstanceOf[Long]).asInstanceOf[EdgeProperty]))
java.lang.UnsupportedOperationException: No Encoder found for EdgeProperty
- field (class: "EdgeProperty", name: "attr")
- root class: "org.apache.spark.graphx.Edge"
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:598)
...
任何帮助将不胜感激
你的问题有点徒劳,因为 GraphX 不支持 Datasets
并且边和顶点都应作为 RDDs
传递,但为了论证:
- 你遇到了第一个异常,因为 Spark 中的分布式数据结构是不变的。不要使用
asInstanceOf
。只需明确使用类型注释即可。 - 您得到第二个例外,因为
Datasets
进一步受到Encoders
用法的限制。Dataset
中的所有对象都必须使用相同的Encoder
,在这种情况下,只有使用二进制编码器才有可能,用户定义的 class. 无法隐式访问该编码器
结合这两部分:
import org.apache.spark.sql.{Dataset, Encoders}
sealed trait EdgeProperty
case class AuthorEdgeProperty(val doccount: Long) extends EdgeProperty
case class CiteEdgeProperty() extends EdgeProperty
val docauthoredges: Dataset[EdgeProperty] = spark.range(10)
.map(AuthorEdgeProperty(_): EdgeProperty)(Encoders.kryo[EdgeProperty])
val docciteedges: Dataset[EdgeProperty] = spark.range(5)
.map(_ => CiteEdgeProperty(): EdgeProperty)(Encoders.kryo[EdgeProperty])
val edges: Dataset[EdgeProperty] = docauthoredges.union(docciteedges)
转换为 RDD
以使其在 GraphX 中可用:
edges.rdd