Scala-Spark:将 Dataframe 转换为 RDD[Edge]
Scala-Spark: Convert Dataframe to RDD[Edge]
我有一个数据框,它表示图形的边;这是架构:
root |-- src: string (nullable = true)
|-- dst: string (nullable = true)
|-- relationship: struct (nullable = false)
| |-- business_id: string (nullable = true)
| |-- normalized_influence: double (nullable = true)
我想将其转换为 RDD[Edge] 以与 Pregel API 一起使用,我的困难在于属性 "relationship"。怎么转换呢?
Edge
是参数化的 class。这意味着除了源 ID 和目标 ID 之外,您还可以在每条边上存储您喜欢的任何内容。在您的情况下,它可能是 Edge[Relationship]
。您可以使用 case classes 来映射数据框和 RDD[Edge[Relationship]]
:
import scala.util.hashing.MurmurHash3
case class Relationship(business_id: String, normalized_influence: Double)
case class MyEdge(src: String, dst: String, relationship: Relationship)
val edges: RDD[Edge[Relationship]] = df.as[MyEdge].rdd.map { edge =>
Edge(
MurmurHash3.stringHash(edge.src).toLong, // VertexId type is a Long, so we need to hash your string
MurmurHash3.stringHash(edge.dst).toLong,
edge.relationship
)
}
我有一个数据框,它表示图形的边;这是架构:
root |-- src: string (nullable = true)
|-- dst: string (nullable = true)
|-- relationship: struct (nullable = false)
| |-- business_id: string (nullable = true)
| |-- normalized_influence: double (nullable = true)
我想将其转换为 RDD[Edge] 以与 Pregel API 一起使用,我的困难在于属性 "relationship"。怎么转换呢?
Edge
是参数化的 class。这意味着除了源 ID 和目标 ID 之外,您还可以在每条边上存储您喜欢的任何内容。在您的情况下,它可能是 Edge[Relationship]
。您可以使用 case classes 来映射数据框和 RDD[Edge[Relationship]]
:
import scala.util.hashing.MurmurHash3
case class Relationship(business_id: String, normalized_influence: Double)
case class MyEdge(src: String, dst: String, relationship: Relationship)
val edges: RDD[Edge[Relationship]] = df.as[MyEdge].rdd.map { edge =>
Edge(
MurmurHash3.stringHash(edge.src).toLong, // VertexId type is a Long, so we need to hash your string
MurmurHash3.stringHash(edge.dst).toLong,
edge.relationship
)
}