使用 SparkGraphComputer 计算泰坦图上的顶点抛出 org.apache.spark.SparkException:作业因阶段失败而中止:

counting vertices on a titan graph using SparkGraphComputer throws org.apache.spark.SparkException: Job aborted due to stage failure:

当尝试使用 SparkGraphComputer 计算集群上泰坦图上的顶点数时,我收到一个错误,我不知道如何处理。我在我的代码和集群中使用 tinkerpop 3.1.1-incubating 和 Titan 1.1.0-SNAPSHOT 我已经安装了 datastax 社区版 2.1.11 和 spark 1.5.2-bin-hadoop2.6

我整理了一个最小的 Java 示例来重现我的问题:

private void strippedDown() {
    // a normal titan cluster
    String titanClusterConfig = "titan-cassandra-test-cluster.properties";

    // a hadoop graph with cassandra as input and gryo as output
    String sparkClusterConfig = "titan-cassandra-test-spark.properties";

    String edgeLabel = "blank";

    // add a graph
    int n = 100;
    Graph titanGraph = GraphFactory.open(titanClusterConfig);
    Vertex superNode = titanGraph.addVertex(T.label, String.valueOf(0));
    for (int i=1;i<n;i++) {
        Vertex currentNode = titanGraph.addVertex(T.label, String.valueOf(i));
        currentNode.addEdge(edgeLabel,superNode);
    }
    titanGraph.tx().commit();

    //count with titan
    Long count = titanGraph.traversal().V().count().next();
    System.out.println("The number of vertices in the graph is: "+count);

    // count the graph using titan graph computer
    count = titanGraph.traversal(GraphTraversalSource.computer(FulgoraGraphComputer.class)).V().count().next();
    System.out.println("The number of vertices in the graph is: "+count);

    // count the graph using spark graph computer
    Graph sparkGraph = GraphFactory.open(sparkClusterConfig);
    count = sparkGraph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().count().next();
    System.out.println("The number of vertices in the graph is: "+count);
}

使用 OLTP 和使用 FulgoraGraphComputer 的 OLAP 的计数 return 正确答案。然而,使用 SparkGraphComputer 的 OLAP 计数抛出 org.apache.spark.SparkException:作业因阶段失败而中止:

有趣的是,如果我 运行 通过与 Titan 一起打包的 gremlin 控制台执行一个类似的脚本,我会得到一个不同的错误,因为它似乎是相同的算法:

graph = GraphFactory.open('titan-cassandra-test-cluster.properties')
graph.addVertex(T.label,"0")
graph.addVertex(T.label,"1")
graph.addVertex(T.label,"2")
graph.tx().commit()
sparkGraph = GraphFactory.open('titan-cassandra-test-spark.properties')
sparkGraph.traversal(computer(SparkGraphComputer)).V().count()

这抛出 org.apache.thrift.protocol.TProtocolException: Required field 'keyspace' was not present! Struct: set_keyspace_args(keyspace:null) 两次但完成并且 returns 0 这是不正确的。

我知道邮件列表中有 this article,但我无法理解它或解决问题。谁能向我解释发生了什么以及如何解决这个问题?我在下面粘贴了我的配置。

gremlin.graph=com.thinkaurelius.titan.core.TitanFactory
storage.backend=cassandrathrift
storage.hostname=node1
storage.cassandra.keyspace=mindmapstest
storage.cassandra.replication-factor=3
cache.db-cache = true
cache.db-cache-clean-wait = 20
cache.db-cache-time = 180000
cache.db-cache-size = 0.5

gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=com.thinkaurelius.titan.hadoop.formats.cassandra.CassandraInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=none
####################################
# Cassandra Cluster Config         #
####################################
titanmr.ioformat.conf.storage.backend=cassandrathrift
titanmr.ioformat.conf.storage.cassandra.keyspace=mindmapstest
titanmr.ioformat.conf.storage.hostname=node1,node2,node3
####################################
# SparkGraphComputer Configuration #
####################################
spark.master=spark://node1:7077
spark.executor.memory=250m
spark.serializer=org.apache.spark.serializer.KryoSerializer
####################################
# Apache Cassandra InputFormat configuration
####################################
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner

编辑:此脚本将重现错误

graph = TitanFactory.open('titan-cassandra-test-cluster.properties')
superNode = graph.addVertex(T.label,"0")
for(i in 1..100) {
  currentNode = graph.addVertex(T.label,i.toString())
  currentNode.addEdge("blank",superNode)
}
graph.tx().commit()

graph.traversal().V().count()

graph.traversal(computer()).V().count()

sparkGraph = GraphFactory.open('titan-cassandra-test-spark.properties')
sparkGraph.traversal(computer(SparkGraphComputer)).V().count()

尝试将这些 parameters 添加到您的 HadoopGraph 配置中。

#
# Titan Cassandra InputFormat configuration
# see https://github.com/thinkaurelius/titan/blob/titan11/titan-hadoop-parent/titan-hadoop-core/src/main/java/com/thinkaurelius/titan/hadoop/formats/cassandra/CassandraBinaryInputFormat.java
titanmr.ioformat.conf.storage.backend=cassandrathrift
titanmr.ioformat.conf.storage.hostname=node1,node2,node3
titanmr.ioformat.conf.storage.cassandra.keyspace=titan
titanmr.ioformat.cf-name=edgestore

#
# Apache Cassandra InputFormat configuration
# see https://github.com/apache/cassandra/blob/cassandra-2.2.3/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
# see https://github.com/thinkaurelius/titan/blob/titan11/titan-hadoop-parent/titan-hadoop-core/src/main/java/com/thinkaurelius/titan/hadoop/formats/cassandra/CassandraBinaryInputFormat.java
# not clear why these need to be set manually when using cassandrathrift
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
cassandra.input.keyspace=titan
cassandra.input.predicate=0c00020b0001000000000b000200000000020003000800047fffffff0000
cassandra.input.columnfamily=edgestore
cassandra.range.batch.size=2147483647

我不是 100% 确定我给出的理由是正确的,但我已经设法解决了这个问题。我的问题源于 3 个基本问题,所有这些问题都与配置有关。

1) Jason 友善地解决了第一个问题,这与连接到 Cassandra 的正确配置选项有关——我仍然很好奇他们到底做了什么。

2) 我无法将java代码成功获取到运行的原因是因为我没有正确设置HADOOP_GREMLIN_LIBS环境变量。由于这个原因,jar 没有被部署到集群中以供图形计算机使用。设置后,gremlin 控制台和 java 示例会遇到同样的问题 - 返回计数为零。

3) 零的计数是最难解决的。再次是不理解手册的情况。有很多关于在我的集群上安装 hadoop 的参考资料,但没有任何地方说明如何连接到集群上的 hadoop。为了做到这一点,需要一个额外的配置选项 fs.defaultFS,它告诉 gremlin 在集群上的哪里可以找到 hadoop 文件系统。一旦设置正确,顶点数就正确了。

我的理论是计算正确执行,但是当需要减少 spark worker 的计数时,它们被持久保存在集群的某个地方,然后当向控制台返回答案时,本地文件系统被查看并什么也没找到,因此返回零。这可能是一个错误?

总之我需要的最终配置文件是这样的:

gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=com.thinkaurelius.titan.hadoop.formats.cassandra.CassandraInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=/test/output
####################################
# Cassandra Cluster Config         #
####################################
titanmr.ioformat.conf.storage.backend=cassandrathrift
titanmr.ioformat.conf.storage.cassandra.keyspace=mindmapstest
titanmr.ioformat.conf.storage.hostname=node1,node2,node3
titanmr.ioformat.cf-name=edgestore
####################################
# SparkGraphComputer Configuration #
####################################
spark.master=spark://node1:7077
spark.executor.memory=1g
spark.serializer=org.apache.spark.serializer.KryoSerializer
####################################
# Apache Cassandra InputFormat configuration
####################################
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
cassandra.input.keyspace=mindmapstest
cassandra.input.predicate=0c00020b0001000000000b000200000000020003000800047fffffff0000
cassandra.input.columnfamily=edgestore
cassandra.range.batch.size=2147483647
spark.eventLog.enabled=true
####################################
# Hadoop Cluster configuration     #
####################################
fs.defaultFS=hdfs://node1:9000