创建数据并将其附加到 spark graphx java

Create and append data to spark graphx java

我有 Java 个对象 Transaction(Object buyer, Object dealer, Int payed) 到 java spark 应用程序中,我想制作一个图表 ( with graphx) 其中 Vertex 是买家和经销商,payed 是边。 此外,如何添加另一笔传入交易?

您可以通过提供所需的 verticesedges:

手动构建图表
    JavaRDD<Tuple2<Long, String>> vertices = sc.parallelize(new 
     Lists.newArrayList({
      Tuple2(1L, "one"), Tuple2(2L, "two"),
      Tuple2(3L, "three"), Tuple2(4L, "four")})
    );

    JavaRDD<Edge> relationships =
      sc.parallelize(Lists.newArrayList({
        Edge(1L, 2L, 1.0), Edge(1L, 4L, 2.0),
        Edge(2L, 4L, 3.0), Edge(3L, 1L, 1.0),
        Edge(3L, 4L, 5.0)}
      );

Graph(relationships, vertices, StorageLevel.MEMORY_ONLY(), StorageLevel.MEMORY_ONLY(), stringTag, stringTag);

此外,您可以将此方法应用于 CSVTSV 或任何适合您的文件格式。

如果你想使用 Dataframe API 进行图表数据处理,你绝对应该看看 GraphFrames

UPD

  1. 如何在 java 中初始化它?

导入Graphclass:

import org.apache.spark.graphx.Graph

Graph 是用顶点和边类型输入的 - Graph<VD,ED> 所以你用顶点,边,存储级别和类型标签初始化这个 class - 这是 [=66 的要求=] API 在 Scala 中你可以只指定顶点和边。

        System.setProperty("hadoop.home.dir", "C:\softwares\Winutils");
        SparkConf conf = new SparkConf().setMaster("local").setAppName("graph");
        JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
        ClassTag<String> stringTag = scala.reflect.ClassTag$.MODULE$.apply(String.class);


        List<Edge<String>> edges = new ArrayList<>();

        edges.add(new Edge<String>(1, 2, "Friend1"));
        edges.add(new Edge<String>(2, 3, "Friend2"));
        edges.add(new Edge<String>(1, 3, "Friend3"));
        edges.add(new Edge<String>(4, 3, "Friend4"));
        edges.add(new Edge<String>(4, 5, "Friend5"));
        edges.add(new Edge<String>(2, 5, "Friend6"));


        JavaRDD<Edge<String>> edgeRDD = javaSparkContext.parallelize(edges);


        Graph<String, String> graph = Graph.fromEdges(edgeRDD.rdd(), "",StorageLevel.MEMORY_ONLY(), StorageLevel.MEMORY_ONLY(), stringTag, stringTag);


        graph.vertices().toJavaRDD().collect().forEach(System.out::println);
  1. 当一个新的交易来了,想法是把它加到graph中,有类似"add"的方法to graph?

Apache Spark RDD 不是为细粒度更新而设计的。所有对 RDD 的操作都是关于改变整个 RDD 的。如果更新操作有一个大图,它可能会因为频繁 shuffle operation 而导致巨大的内存消耗。但是如果你不想经常重建一个图,你可以简单地从以前的图实例中复制 vertices/edges 并向它附加更新。如果您的用例是频繁的数据更新,您可能最好使用不同的方法 - 在数据库端处理更新:CassandraHBase 专为稳健的 insert/update 操作而设计,如果您需要在数据库中存储图形 - 考虑 Neo4J

您可以将上面提到的 NoSQL DB 的用法与 Spark GraphX 结合使用 - 通过调度程序将数据提取到 Graph,通过 Spark SQL 获取来自 Kafka 的消息等的一些信号,然后进一步加载到Graph 和必要的处理(我有 the example 这一步与最流行的 scala 算法,不难移动到 java)。