创建数据并将其附加到 spark graphx java
Create and append data to spark graphx java
我有 Java 个对象 Transaction(Object buyer, Object dealer, Int payed) 到 java spark 应用程序中,我想制作一个图表 ( with graphx) 其中 Vertex 是买家和经销商,payed 是边。
此外,如何添加另一笔传入交易?
您可以通过提供所需的 vertices
和 edges
:
手动构建图表
JavaRDD<Tuple2<Long, String>> vertices = sc.parallelize(new
Lists.newArrayList({
Tuple2(1L, "one"), Tuple2(2L, "two"),
Tuple2(3L, "three"), Tuple2(4L, "four")})
);
JavaRDD<Edge> relationships =
sc.parallelize(Lists.newArrayList({
Edge(1L, 2L, 1.0), Edge(1L, 4L, 2.0),
Edge(2L, 4L, 3.0), Edge(3L, 1L, 1.0),
Edge(3L, 4L, 5.0)}
);
Graph(relationships, vertices, StorageLevel.MEMORY_ONLY(), StorageLevel.MEMORY_ONLY(), stringTag, stringTag);
此外,您可以将此方法应用于 CSV
、TSV
或任何适合您的文件格式。
如果你想使用 Dataframe API
进行图表数据处理,你绝对应该看看 GraphFrames。
UPD
- 如何在 java 中初始化它?
导入Graph
class:
import org.apache.spark.graphx.Graph
Graph 是用顶点和边类型输入的 - Graph<VD,ED>
所以你用顶点,边,存储级别和类型标签初始化这个 class - 这是 [=66 的要求=] API 在 Scala 中你可以只指定顶点和边。
System.setProperty("hadoop.home.dir", "C:\softwares\Winutils");
SparkConf conf = new SparkConf().setMaster("local").setAppName("graph");
JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
ClassTag<String> stringTag = scala.reflect.ClassTag$.MODULE$.apply(String.class);
List<Edge<String>> edges = new ArrayList<>();
edges.add(new Edge<String>(1, 2, "Friend1"));
edges.add(new Edge<String>(2, 3, "Friend2"));
edges.add(new Edge<String>(1, 3, "Friend3"));
edges.add(new Edge<String>(4, 3, "Friend4"));
edges.add(new Edge<String>(4, 5, "Friend5"));
edges.add(new Edge<String>(2, 5, "Friend6"));
JavaRDD<Edge<String>> edgeRDD = javaSparkContext.parallelize(edges);
Graph<String, String> graph = Graph.fromEdges(edgeRDD.rdd(), "",StorageLevel.MEMORY_ONLY(), StorageLevel.MEMORY_ONLY(), stringTag, stringTag);
graph.vertices().toJavaRDD().collect().forEach(System.out::println);
- 当一个新的交易来了,想法是把它加到graph中,有类似"add"的方法to graph?
Apache Spark RDD 不是为细粒度更新而设计的。所有对 RDD 的操作都是关于改变整个 RDD 的。如果更新操作有一个大图,它可能会因为频繁 shuffle operation 而导致巨大的内存消耗。但是如果你不想经常重建一个图,你可以简单地从以前的图实例中复制 vertices/edges 并向它附加更新。如果您的用例是频繁的数据更新,您可能最好使用不同的方法 - 在数据库端处理更新:Cassandra
、HBase
专为稳健的 insert/update 操作而设计,如果您需要在数据库中存储图形 - 考虑 Neo4J
。
您可以将上面提到的 NoSQL DB 的用法与 Spark GraphX 结合使用 - 通过调度程序将数据提取到 Graph
,通过 Spark SQL
获取来自 Kafka 的消息等的一些信号,然后进一步加载到Graph
和必要的处理(我有 the example 这一步与最流行的 scala 算法,不难移动到 java)。
我有 Java 个对象 Transaction(Object buyer, Object dealer, Int payed) 到 java spark 应用程序中,我想制作一个图表 ( with graphx) 其中 Vertex 是买家和经销商,payed 是边。 此外,如何添加另一笔传入交易?
您可以通过提供所需的 vertices
和 edges
:
JavaRDD<Tuple2<Long, String>> vertices = sc.parallelize(new
Lists.newArrayList({
Tuple2(1L, "one"), Tuple2(2L, "two"),
Tuple2(3L, "three"), Tuple2(4L, "four")})
);
JavaRDD<Edge> relationships =
sc.parallelize(Lists.newArrayList({
Edge(1L, 2L, 1.0), Edge(1L, 4L, 2.0),
Edge(2L, 4L, 3.0), Edge(3L, 1L, 1.0),
Edge(3L, 4L, 5.0)}
);
Graph(relationships, vertices, StorageLevel.MEMORY_ONLY(), StorageLevel.MEMORY_ONLY(), stringTag, stringTag);
此外,您可以将此方法应用于 CSV
、TSV
或任何适合您的文件格式。
如果你想使用 Dataframe API
进行图表数据处理,你绝对应该看看 GraphFrames。
UPD
- 如何在 java 中初始化它?
导入Graph
class:
import org.apache.spark.graphx.Graph
Graph 是用顶点和边类型输入的 - Graph<VD,ED>
所以你用顶点,边,存储级别和类型标签初始化这个 class - 这是 [=66 的要求=] API 在 Scala 中你可以只指定顶点和边。
System.setProperty("hadoop.home.dir", "C:\softwares\Winutils");
SparkConf conf = new SparkConf().setMaster("local").setAppName("graph");
JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
ClassTag<String> stringTag = scala.reflect.ClassTag$.MODULE$.apply(String.class);
List<Edge<String>> edges = new ArrayList<>();
edges.add(new Edge<String>(1, 2, "Friend1"));
edges.add(new Edge<String>(2, 3, "Friend2"));
edges.add(new Edge<String>(1, 3, "Friend3"));
edges.add(new Edge<String>(4, 3, "Friend4"));
edges.add(new Edge<String>(4, 5, "Friend5"));
edges.add(new Edge<String>(2, 5, "Friend6"));
JavaRDD<Edge<String>> edgeRDD = javaSparkContext.parallelize(edges);
Graph<String, String> graph = Graph.fromEdges(edgeRDD.rdd(), "",StorageLevel.MEMORY_ONLY(), StorageLevel.MEMORY_ONLY(), stringTag, stringTag);
graph.vertices().toJavaRDD().collect().forEach(System.out::println);
- 当一个新的交易来了,想法是把它加到graph中,有类似"add"的方法to graph?
Apache Spark RDD 不是为细粒度更新而设计的。所有对 RDD 的操作都是关于改变整个 RDD 的。如果更新操作有一个大图,它可能会因为频繁 shuffle operation 而导致巨大的内存消耗。但是如果你不想经常重建一个图,你可以简单地从以前的图实例中复制 vertices/edges 并向它附加更新。如果您的用例是频繁的数据更新,您可能最好使用不同的方法 - 在数据库端处理更新:Cassandra
、HBase
专为稳健的 insert/update 操作而设计,如果您需要在数据库中存储图形 - 考虑 Neo4J
。
您可以将上面提到的 NoSQL DB 的用法与 Spark GraphX 结合使用 - 通过调度程序将数据提取到 Graph
,通过 Spark SQL
获取来自 Kafka 的消息等的一些信号,然后进一步加载到Graph
和必要的处理(我有 the example 这一步与最流行的 scala 算法,不难移动到 java)。