泛化 DynamoDB + JanusGraph Factory:锁和模式问题
Generalizing DynamoDB + JanusGraph Factory: Lock and Schema Problems
我正在努力推广来自 AWS 的 DynamoDB + JanusGraph 教程,这样,给定一个具有标准约定的标准 .txt 文件,程序可以摄取数据(作为三元组)并创建顶点、属性、和边缘。通常我不会 post 这么长的问题,但看起来这些都与我创建的 class、ObjectCreationCommand 中的 4-5 行有关。
三元组的示例如下所示:"name:Jim Henson \t isCreatorOf \t televisionshow:The Muppets"
- 左对象:Jim Henson
- 左对象属性:姓名
- 关系:isCreatorOf
- 正确的对象:布偶
- 右对象属性:电视节目
虽然程序编译并且 运行s,但我抛出了几个异常,阻止图形被填充。当我 运行 Factory 程序时,它读取我所有的三元组并将它们放入一个哈希集中,但随后出现以下错误(10 次,但这只是 1 个示例):
57338 [pool-10-thread-2] ERROR org.janusgraph.graphdb.database.StandardJanusGraph - Could not commit transaction [10] due to exception
org.janusgraph.diskstorage.locking.TemporaryLockingException: tx 0x181404008c7c already locked key-column ( 16-165-160-114-116- 30- 98-114- 97-110-100-116-121-112-229, 0) when tx 0x181408349015 tried to lock
at com.amazon.janusgraph.diskstorage.dynamodb.AbstractDynamoDBStore.acquireLock(AbstractDynamoDBStore.java:132)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore.call(MetricInstrumentedStore.java:155)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore.call(MetricInstrumentedStore.java:153)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore.runWithMetrics(MetricInstrumentedStore.java:217)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore.acquireLock(MetricInstrumentedStore.java:152)
at org.janusgraph.diskstorage.keycolumnvalue.KCVSProxy.acquireLock(KCVSProxy.java:52)
at org.janusgraph.diskstorage.BackendTransaction.acquireIndexLock(BackendTransaction.java:255)
at org.janusgraph.graphdb.database.StandardJanusGraph.prepareCommit(StandardJanusGraph.java:565)
at org.janusgraph.graphdb.database.StandardJanusGraph.commit(StandardJanusGraph.java:694)
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.commit(StandardJanusGraphTx.java:1363)
at org.janusgraph.graphdb.database.management.ManagementSystem.commit(ManagementSystem.java:235)
at com.amazon.janusgraph.creator.ObjectCreationCommand.run(ObjectCreationCommand.java:59)
at com.amazon.janusgraph.batch.BatchCommand.run(BatchCommand.java:34)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
接下来抛出类似的异常:
57427 [pool-10-thread-10] ERROR com.amazon.janusgraph.example.MarvelGraphFactory - Error processing line Could not commit transaction due to exception during persistence tx 0x181404008c7c already locked key-column ( 16-165-160-114-116- 30- 98-114- 97-110-100-116-121-112-229, 0) when tx 0x181403f69b77 tried to lock
org.janusgraph.core.JanusGraphException: Could not commit transaction due to exception during persistence
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.commit(StandardJanusGraphTx.java:1374)
at org.janusgraph.graphdb.database.management.ManagementSystem.commit(ManagementSystem.java:235)
at com.amazon.janusgraph.creator.ObjectCreationCommand.run(ObjectCreationCommand.java:59)
at com.amazon.janusgraph.batch.BatchCommand.run(BatchCommand.java:34)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.janusgraph.core.JanusGraphException: Unexpected exception
at org.janusgraph.graphdb.database.StandardJanusGraph.commit(StandardJanusGraph.java:798)
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.commit(StandardJanusGraphTx.java:1363)
... 6 more
Caused by: org.janusgraph.diskstorage.locking.TemporaryLockingException: tx 0x181404008c7c already locked key-column ( 16-165-160-114-116- 30- 98-114- 97-110-100-116-121-112-229, 0) when tx 0x181403f69b77 tried to lock
at com.amazon.janusgraph.diskstorage.dynamodb.AbstractDynamoDBStore.acquireLock(AbstractDynamoDBStore.java:132)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore.call(MetricInstrumentedStore.java:155)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore.call(MetricInstrumentedStore.java:153)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore.runWithMetrics(MetricInstrumentedStore.java:217)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore.acquireLock(MetricInstrumentedStore.java:152)
at org.janusgraph.diskstorage.keycolumnvalue.KCVSProxy.acquireLock(KCVSProxy.java:52)
at org.janusgraph.diskstorage.BackendTransaction.acquireIndexLock(BackendTransaction.java:255)
at org.janusgraph.graphdb.database.StandardJanusGraph.prepareCommit(StandardJanusGraph.java:565)
at org.janusgraph.graphdb.database.StandardJanusGraph.commit(StandardJanusGraph.java:694)
... 7 more
然后抛出与架构相关的异常:
58030 [pool-10-thread-4] ERROR com.amazon.janusgraph.example.MarvelGraphFactory - Error processing line Adding this property for key [~T$SchemaName] and value [rtbrandtype] violates a uniqueness constraint [SystemIndex#~T$SchemaName]
org.janusgraph.core.SchemaViolationException: Adding this property for key [~T$SchemaName] and value [rtbrandtype] violates a uniqueness constraint [SystemIndex#~T$SchemaName]
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.addProperty(StandardJanusGraphTx.java:791)
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.addProperty(StandardJanusGraphTx.java:720)
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.makeSchemaVertex(StandardJanusGraphTx.java:847)
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.makePropertyKey(StandardJanusGraphTx.java:867)
at org.janusgraph.graphdb.types.StandardPropertyKeyMaker.make(StandardPropertyKeyMaker.java:100)
at com.amazon.janusgraph.creator.ObjectCreationCommand.run(ObjectCreationCommand.java:47)
at com.amazon.janusgraph.batch.BatchCommand.run(BatchCommand.java:34)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
最后,抛出一个我真的不明白的异常:
58512 [pool-10-thread-8] ERROR com.amazon.janusgraph.example.MarvelGraphFactory - Error processing line Could not find type for id: 11529
java.lang.IllegalStateException: Could not find type for id: 11529
at com.google.common.base.Preconditions.checkState(Preconditions.java:197)
at org.janusgraph.graphdb.types.vertices.JanusGraphSchemaVertex.name(JanusGraphSchemaVertex.java:59)
at org.janusgraph.graphdb.types.vertices.JanusGraphSchemaVertex.asIndexType(JanusGraphSchemaVertex.java:177)
at org.janusgraph.graphdb.database.management.ManagementSystem.getGraphIndexDirect(ManagementSystem.java:412)
at org.janusgraph.graphdb.database.management.ManagementSystem.getGraphIndex(ManagementSystem.java:422)
at com.amazon.janusgraph.creator.ObjectCreationCommand.run(ObjectCreationCommand.java:55)
at com.amazon.janusgraph.batch.BatchCommand.run(BatchCommand.java:34)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
因为最终事务为Null,抛出NullPointerException,事务永远不会提交;因此,我的图表已初始化但为空。
通常我不会 post 这么长的问题,但看起来这些都与我创建的同一个 class、ObjectCreationCommand 中的 4-5 行有关。
ObjectCreationCommand.java
package com.amazon.janusgraph.creator;
import com.amazon.janusgraph.example.TravelGraphFactory;
import com.codahale.metrics.MetricRegistry;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.janusgraph.core.JanusGraph;
import com.amazon.janusgraph.triple.Triple;
import org.janusgraph.core.Multiplicity;
import org.janusgraph.core.PropertyKey;
import org.janusgraph.core.schema.JanusGraphManagement;
import org.slf4j.Logger;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
public class ObjectCreationCommand implements Runnable {
public static JanusGraph graph;
private static Triple triple;
private static MetricRegistry REGISTRY;
public static Logger LOG;
private static final String TIMER_LINE = "TravelGraphFactory.line";
private static final String TIMER_CREATE = "TravelGraphFactory.create_";
private static final String COUNTER_GET = "TravelGraphFactory.get_";
public ObjectCreationCommand(JanusGraph graph, Triple triple, MetricRegistry REGISTRY, Logger LOG) {
this.graph = graph;
this.triple = triple;
this.REGISTRY = REGISTRY;
this.LOG = LOG;
}
@Override
public void run() {
JanusGraphManagement mgmt = graph.openManagement();
if (mgmt.getGraphIndex(triple.getRightObjectProperty()) == null) {
final PropertyKey hotelKey = mgmt.makePropertyKey(triple.getRightObjectProperty()).dataType(String.class).make();
mgmt.buildIndex(triple.getRightObjectProperty(), Vertex.class).addKey(hotelKey).unique().buildCompositeIndex();
}
if (mgmt.getEdgeLabel(triple.getRelationship()) == null) {
mgmt.makeEdgeLabel(triple.getRelationship()).multiplicity(Multiplicity.MANY2ONE).make();
}
if (mgmt.getGraphIndex(triple.getLeftObjectProperty()) == null) {
final PropertyKey brandTypeKey = mgmt.makePropertyKey(triple.getLeftObjectProperty()).dataType(String.class).make();
mgmt.buildIndex(triple.getLeftObjectProperty(), Vertex.class).addKey(brandTypeKey).unique().buildCompositeIndex();
}
mgmt.commit();
long start = System.currentTimeMillis();
String RIGHT_OBJECT_PROPERTY = triple.getRightObjectProperty();
Vertex rightObject = graph.addVertex();
rightObject.property(RIGHT_OBJECT_PROPERTY, triple.getRightObject());
REGISTRY.counter(COUNTER_GET + RIGHT_OBJECT_PROPERTY).inc();
String LEFT_OBJECT_PROPERTY = triple.getLeftObjectProperty();
Vertex leftObject = graph.addVertex();
rightObject.property(LEFT_OBJECT_PROPERTY, triple.getLeftObject());
REGISTRY.counter(COUNTER_GET + LEFT_OBJECT_PROPERTY).inc();
try {
processRelationship(graph, triple);
} catch (Throwable e) {
Throwable rootCause = ExceptionUtils.getRootCause(e);
String rootCauseMessage = null == rootCause ? "" : rootCause.getMessage();
LOG.error("Error processing line {} {}", e.getMessage(), rootCauseMessage, e);
}
long end = System.currentTimeMillis();
long time = end - start;
REGISTRY.timer(TIMER_CREATE + RIGHT_OBJECT_PROPERTY).update(time, TimeUnit.MILLISECONDS);
}
private static void processRelationship(JanusGraph graph, Triple triple) {
Vertex left = get(graph, triple.getLeftObjectProperty(), triple.getLeftObject());
if (null == left) {
REGISTRY.counter("error.missingLeftObject." + triple.getLeftObject()).inc();
left = graph.addVertex();
left.property(triple.getLeftObjectProperty(), triple.getLeftObject());
}
Vertex right = get(graph, triple.getRightObjectProperty(), triple.getRightObject());
if (null == right) {
REGISTRY.counter("error.missingRightObject." + triple.getRightObject()).inc();
right = graph.addVertex();
right.property(triple.getRightObjectProperty(), triple.getRightObject());
}
left.addEdge(triple.getRelationship(), right);
}
private static Vertex get(final JanusGraph graph, final String key, final String value) {
final GraphTraversalSource g = graph.traversal();
final Iterator<Vertex> it = g.V().has(key, value);
return it.hasNext() ? it.next() : null;
}
}
上面的异常表明所有错误都来自 class:
的第 47、55 或 59 行
JanusGraphManagement mgmt = graph.openManagement();
if (mgmt.getGraphIndex(triple.getRightObjectProperty()) == null) {
[47] final PropertyKey hotelKey = mgmt.makePropertyKey(triple.getRightObjectProperty()).dataType(String.class).make();
mgmt.buildIndex(triple.getRightObjectProperty(), Vertex.class).addKey(hotelKey).unique().buildCompositeIndex();
}
if (mgmt.getEdgeLabel(triple.getRelationship()) == null) {
mgmt.makeEdgeLabel(triple.getRelationship()).multiplicity(Multiplicity.MANY2ONE).make();
}
[55] if (mgmt.getGraphIndex(triple.getLeftObjectProperty()) == null) {
final PropertyKey brandTypeKey = mgmt.makePropertyKey(triple.getLeftObjectProperty()).dataType(String.class).make();
mgmt.buildIndex(triple.getLeftObjectProperty(), Vertex.class).addKey(brandTypeKey).unique().buildCompositeIndex();
}
[59] mgmt.commit();
任何人都可以帮助确定我在这个 class 中做错了什么吗?无论我在做什么都是锁定表和创建架构问题。
如 documentation 中所述,您应该在尝试创建索引之前回滚任何活动事务。你试过了吗?
首先,对文本文件中的每一行执行您原来的 运行() 方法。如果 first thing you do is roll back,您在上一次迭代中创建的顶点和边会被吹走。
其次,在下一行中,您尝试 setup the schema for each and every line,即使您只需要在实例化图形后立即为图形设置一次架构。这是您的架构异常的来源,因为索引名称必须是唯一的(并且不一定需要以它们索引的属性命名)。
第三,在第 78 行和第 83 行,您假设您是 creating the vertices anew for every line of the text file you processed. The uniqueness constraint applies there just as much as it did on lines 107 and 114,在处理关系时尊重唯一性约束。
最后,对于具有约 100 个顶点和约 100 个边的图,设置批处理和执行器有点过头了。我提交了 a PR that fixes all of these issues and proposes two different ways 来处理手头的数据负载。
我正在努力推广来自 AWS 的 DynamoDB + JanusGraph 教程,这样,给定一个具有标准约定的标准 .txt 文件,程序可以摄取数据(作为三元组)并创建顶点、属性、和边缘。通常我不会 post 这么长的问题,但看起来这些都与我创建的 class、ObjectCreationCommand 中的 4-5 行有关。
三元组的示例如下所示:"name:Jim Henson \t isCreatorOf \t televisionshow:The Muppets"
- 左对象:Jim Henson
- 左对象属性:姓名
- 关系:isCreatorOf
- 正确的对象:布偶
- 右对象属性:电视节目
虽然程序编译并且 运行s,但我抛出了几个异常,阻止图形被填充。当我 运行 Factory 程序时,它读取我所有的三元组并将它们放入一个哈希集中,但随后出现以下错误(10 次,但这只是 1 个示例):
57338 [pool-10-thread-2] ERROR org.janusgraph.graphdb.database.StandardJanusGraph - Could not commit transaction [10] due to exception
org.janusgraph.diskstorage.locking.TemporaryLockingException: tx 0x181404008c7c already locked key-column ( 16-165-160-114-116- 30- 98-114- 97-110-100-116-121-112-229, 0) when tx 0x181408349015 tried to lock
at com.amazon.janusgraph.diskstorage.dynamodb.AbstractDynamoDBStore.acquireLock(AbstractDynamoDBStore.java:132)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore.call(MetricInstrumentedStore.java:155)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore.call(MetricInstrumentedStore.java:153)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore.runWithMetrics(MetricInstrumentedStore.java:217)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore.acquireLock(MetricInstrumentedStore.java:152)
at org.janusgraph.diskstorage.keycolumnvalue.KCVSProxy.acquireLock(KCVSProxy.java:52)
at org.janusgraph.diskstorage.BackendTransaction.acquireIndexLock(BackendTransaction.java:255)
at org.janusgraph.graphdb.database.StandardJanusGraph.prepareCommit(StandardJanusGraph.java:565)
at org.janusgraph.graphdb.database.StandardJanusGraph.commit(StandardJanusGraph.java:694)
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.commit(StandardJanusGraphTx.java:1363)
at org.janusgraph.graphdb.database.management.ManagementSystem.commit(ManagementSystem.java:235)
at com.amazon.janusgraph.creator.ObjectCreationCommand.run(ObjectCreationCommand.java:59)
at com.amazon.janusgraph.batch.BatchCommand.run(BatchCommand.java:34)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
接下来抛出类似的异常:
57427 [pool-10-thread-10] ERROR com.amazon.janusgraph.example.MarvelGraphFactory - Error processing line Could not commit transaction due to exception during persistence tx 0x181404008c7c already locked key-column ( 16-165-160-114-116- 30- 98-114- 97-110-100-116-121-112-229, 0) when tx 0x181403f69b77 tried to lock
org.janusgraph.core.JanusGraphException: Could not commit transaction due to exception during persistence
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.commit(StandardJanusGraphTx.java:1374)
at org.janusgraph.graphdb.database.management.ManagementSystem.commit(ManagementSystem.java:235)
at com.amazon.janusgraph.creator.ObjectCreationCommand.run(ObjectCreationCommand.java:59)
at com.amazon.janusgraph.batch.BatchCommand.run(BatchCommand.java:34)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.janusgraph.core.JanusGraphException: Unexpected exception
at org.janusgraph.graphdb.database.StandardJanusGraph.commit(StandardJanusGraph.java:798)
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.commit(StandardJanusGraphTx.java:1363)
... 6 more
Caused by: org.janusgraph.diskstorage.locking.TemporaryLockingException: tx 0x181404008c7c already locked key-column ( 16-165-160-114-116- 30- 98-114- 97-110-100-116-121-112-229, 0) when tx 0x181403f69b77 tried to lock
at com.amazon.janusgraph.diskstorage.dynamodb.AbstractDynamoDBStore.acquireLock(AbstractDynamoDBStore.java:132)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore.call(MetricInstrumentedStore.java:155)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore.call(MetricInstrumentedStore.java:153)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore.runWithMetrics(MetricInstrumentedStore.java:217)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore.acquireLock(MetricInstrumentedStore.java:152)
at org.janusgraph.diskstorage.keycolumnvalue.KCVSProxy.acquireLock(KCVSProxy.java:52)
at org.janusgraph.diskstorage.BackendTransaction.acquireIndexLock(BackendTransaction.java:255)
at org.janusgraph.graphdb.database.StandardJanusGraph.prepareCommit(StandardJanusGraph.java:565)
at org.janusgraph.graphdb.database.StandardJanusGraph.commit(StandardJanusGraph.java:694)
... 7 more
然后抛出与架构相关的异常:
58030 [pool-10-thread-4] ERROR com.amazon.janusgraph.example.MarvelGraphFactory - Error processing line Adding this property for key [~T$SchemaName] and value [rtbrandtype] violates a uniqueness constraint [SystemIndex#~T$SchemaName]
org.janusgraph.core.SchemaViolationException: Adding this property for key [~T$SchemaName] and value [rtbrandtype] violates a uniqueness constraint [SystemIndex#~T$SchemaName]
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.addProperty(StandardJanusGraphTx.java:791)
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.addProperty(StandardJanusGraphTx.java:720)
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.makeSchemaVertex(StandardJanusGraphTx.java:847)
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.makePropertyKey(StandardJanusGraphTx.java:867)
at org.janusgraph.graphdb.types.StandardPropertyKeyMaker.make(StandardPropertyKeyMaker.java:100)
at com.amazon.janusgraph.creator.ObjectCreationCommand.run(ObjectCreationCommand.java:47)
at com.amazon.janusgraph.batch.BatchCommand.run(BatchCommand.java:34)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
最后,抛出一个我真的不明白的异常:
58512 [pool-10-thread-8] ERROR com.amazon.janusgraph.example.MarvelGraphFactory - Error processing line Could not find type for id: 11529
java.lang.IllegalStateException: Could not find type for id: 11529
at com.google.common.base.Preconditions.checkState(Preconditions.java:197)
at org.janusgraph.graphdb.types.vertices.JanusGraphSchemaVertex.name(JanusGraphSchemaVertex.java:59)
at org.janusgraph.graphdb.types.vertices.JanusGraphSchemaVertex.asIndexType(JanusGraphSchemaVertex.java:177)
at org.janusgraph.graphdb.database.management.ManagementSystem.getGraphIndexDirect(ManagementSystem.java:412)
at org.janusgraph.graphdb.database.management.ManagementSystem.getGraphIndex(ManagementSystem.java:422)
at com.amazon.janusgraph.creator.ObjectCreationCommand.run(ObjectCreationCommand.java:55)
at com.amazon.janusgraph.batch.BatchCommand.run(BatchCommand.java:34)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
因为最终事务为Null,抛出NullPointerException,事务永远不会提交;因此,我的图表已初始化但为空。
通常我不会 post 这么长的问题,但看起来这些都与我创建的同一个 class、ObjectCreationCommand 中的 4-5 行有关。
ObjectCreationCommand.java
package com.amazon.janusgraph.creator;
import com.amazon.janusgraph.example.TravelGraphFactory;
import com.codahale.metrics.MetricRegistry;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.janusgraph.core.JanusGraph;
import com.amazon.janusgraph.triple.Triple;
import org.janusgraph.core.Multiplicity;
import org.janusgraph.core.PropertyKey;
import org.janusgraph.core.schema.JanusGraphManagement;
import org.slf4j.Logger;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
public class ObjectCreationCommand implements Runnable {
public static JanusGraph graph;
private static Triple triple;
private static MetricRegistry REGISTRY;
public static Logger LOG;
private static final String TIMER_LINE = "TravelGraphFactory.line";
private static final String TIMER_CREATE = "TravelGraphFactory.create_";
private static final String COUNTER_GET = "TravelGraphFactory.get_";
public ObjectCreationCommand(JanusGraph graph, Triple triple, MetricRegistry REGISTRY, Logger LOG) {
this.graph = graph;
this.triple = triple;
this.REGISTRY = REGISTRY;
this.LOG = LOG;
}
@Override
public void run() {
JanusGraphManagement mgmt = graph.openManagement();
if (mgmt.getGraphIndex(triple.getRightObjectProperty()) == null) {
final PropertyKey hotelKey = mgmt.makePropertyKey(triple.getRightObjectProperty()).dataType(String.class).make();
mgmt.buildIndex(triple.getRightObjectProperty(), Vertex.class).addKey(hotelKey).unique().buildCompositeIndex();
}
if (mgmt.getEdgeLabel(triple.getRelationship()) == null) {
mgmt.makeEdgeLabel(triple.getRelationship()).multiplicity(Multiplicity.MANY2ONE).make();
}
if (mgmt.getGraphIndex(triple.getLeftObjectProperty()) == null) {
final PropertyKey brandTypeKey = mgmt.makePropertyKey(triple.getLeftObjectProperty()).dataType(String.class).make();
mgmt.buildIndex(triple.getLeftObjectProperty(), Vertex.class).addKey(brandTypeKey).unique().buildCompositeIndex();
}
mgmt.commit();
long start = System.currentTimeMillis();
String RIGHT_OBJECT_PROPERTY = triple.getRightObjectProperty();
Vertex rightObject = graph.addVertex();
rightObject.property(RIGHT_OBJECT_PROPERTY, triple.getRightObject());
REGISTRY.counter(COUNTER_GET + RIGHT_OBJECT_PROPERTY).inc();
String LEFT_OBJECT_PROPERTY = triple.getLeftObjectProperty();
Vertex leftObject = graph.addVertex();
rightObject.property(LEFT_OBJECT_PROPERTY, triple.getLeftObject());
REGISTRY.counter(COUNTER_GET + LEFT_OBJECT_PROPERTY).inc();
try {
processRelationship(graph, triple);
} catch (Throwable e) {
Throwable rootCause = ExceptionUtils.getRootCause(e);
String rootCauseMessage = null == rootCause ? "" : rootCause.getMessage();
LOG.error("Error processing line {} {}", e.getMessage(), rootCauseMessage, e);
}
long end = System.currentTimeMillis();
long time = end - start;
REGISTRY.timer(TIMER_CREATE + RIGHT_OBJECT_PROPERTY).update(time, TimeUnit.MILLISECONDS);
}
private static void processRelationship(JanusGraph graph, Triple triple) {
Vertex left = get(graph, triple.getLeftObjectProperty(), triple.getLeftObject());
if (null == left) {
REGISTRY.counter("error.missingLeftObject." + triple.getLeftObject()).inc();
left = graph.addVertex();
left.property(triple.getLeftObjectProperty(), triple.getLeftObject());
}
Vertex right = get(graph, triple.getRightObjectProperty(), triple.getRightObject());
if (null == right) {
REGISTRY.counter("error.missingRightObject." + triple.getRightObject()).inc();
right = graph.addVertex();
right.property(triple.getRightObjectProperty(), triple.getRightObject());
}
left.addEdge(triple.getRelationship(), right);
}
private static Vertex get(final JanusGraph graph, final String key, final String value) {
final GraphTraversalSource g = graph.traversal();
final Iterator<Vertex> it = g.V().has(key, value);
return it.hasNext() ? it.next() : null;
}
}
上面的异常表明所有错误都来自 class:
的第 47、55 或 59 行JanusGraphManagement mgmt = graph.openManagement();
if (mgmt.getGraphIndex(triple.getRightObjectProperty()) == null) {
[47] final PropertyKey hotelKey = mgmt.makePropertyKey(triple.getRightObjectProperty()).dataType(String.class).make();
mgmt.buildIndex(triple.getRightObjectProperty(), Vertex.class).addKey(hotelKey).unique().buildCompositeIndex();
}
if (mgmt.getEdgeLabel(triple.getRelationship()) == null) {
mgmt.makeEdgeLabel(triple.getRelationship()).multiplicity(Multiplicity.MANY2ONE).make();
}
[55] if (mgmt.getGraphIndex(triple.getLeftObjectProperty()) == null) {
final PropertyKey brandTypeKey = mgmt.makePropertyKey(triple.getLeftObjectProperty()).dataType(String.class).make();
mgmt.buildIndex(triple.getLeftObjectProperty(), Vertex.class).addKey(brandTypeKey).unique().buildCompositeIndex();
}
[59] mgmt.commit();
任何人都可以帮助确定我在这个 class 中做错了什么吗?无论我在做什么都是锁定表和创建架构问题。
如 documentation 中所述,您应该在尝试创建索引之前回滚任何活动事务。你试过了吗?
首先,对文本文件中的每一行执行您原来的 运行() 方法。如果 first thing you do is roll back,您在上一次迭代中创建的顶点和边会被吹走。
其次,在下一行中,您尝试 setup the schema for each and every line,即使您只需要在实例化图形后立即为图形设置一次架构。这是您的架构异常的来源,因为索引名称必须是唯一的(并且不一定需要以它们索引的属性命名)。
第三,在第 78 行和第 83 行,您假设您是 creating the vertices anew for every line of the text file you processed. The uniqueness constraint applies there just as much as it did on lines 107 and 114,在处理关系时尊重唯一性约束。
最后,对于具有约 100 个顶点和约 100 个边的图,设置批处理和执行器有点过头了。我提交了 a PR that fixes all of these issues and proposes two different ways 来处理手头的数据负载。