OrientDB ETL,在两个已经在 Graph 中的顶点之间创建边

OrientDB ETL, create edge between two vertices which are already in Graph

我正在尝试在两个顶点之间创建一条边,这两个顶点已经是 OreintDB 的一部分。我的边缘数据在 MySQL table 中。

这是我的 oetl json。

    {
      "config": {
        "log": "info"
      },
      "source": { "file": { "path": "/Users/RP/user_invited_data.csv" } },
  "extractor": { "csv": {"columnsOnFirstLine": true, "columns":["user_id:string", "invited_by:string", "invited_date:datetime"] } },
      "transformers" : [
       { "vertex": { "class": "User", "skipDuplicates": true} },
        { "edge": { "class": "INVITED", "direction" : "in", 
                "joinFieldName": "invited_by",
                "lookup":"select expand(u) from (match {class: User, as: u} return u) where u.user_id = ?;", 
                "unresolvedLinkAction":"NOTHING",
                "edgeFields": { "invited_date": "${input.invited_date}" },
                "skipDuplicates": true
               } 
        },
        { "field": 
          { "fieldNames": 
            [ "invited_by", "invited_date"], 
            "operation": "remove"
          } 
      }
      ],
      "loader" : {
        "orientdb": {
          "dbURL": "remote:localhost/abcd_graph",
          "dbUser": "root",
          "dbPassword": "root",
          "dbType": "graph",
          "dbAutoCreate": false,
          "batchCommit": 1000
        }
      }
    }

当我运行上面的json时,它正在为用户顶点抛出ORecordDuplicatedException。我在 user_id 上创建了一个唯一索引,并且有 skipDuplicates = true。任何建议将不胜感激。

更新: OrientDB的Gem,skipDuplicates实际上在你的log水平是而不是DEBUG的时候有效。但是问题还没有解决。现在没有错误,但没有创建边缘。我会继续调试,看看今晚能不能修好。

更新 经过更深入的调试后,我在存储级别更深的地方发现了一个异常。

com.orientechnologies.orient.core.exception.ODatabaseException: Impossible to serialize invalid link #-1:-1
    DB name="abcd_graph"
    at com.orientechnologies.orient.core.serialization.serializer.record.binary.ORecordSerializerBinaryV0.writeOptimizedLink(ORecordSerializerBinaryV0.java:867)
    at com.orientechnologies.orient.core.serialization.serializer.record.binary.ORecordSerializerBinaryV0.serializeValue(ORecordSerializerBinaryV0.java:754)
    at com.orientechnologies.orient.core.serialization.serializer.record.binary.ORecordSerializerBinaryV0.serialize(ORecordSerializerBinaryV0.java:385)
    at com.orientechnologies.orient.core.serialization.serializer.record.binary.ORecordSerializerBinary.toStream(ORecordSerializerBinary.java:99)
    at com.orientechnologies.orient.core.record.impl.ODocument.toStream(ODocument.java:2381)
    at com.orientechnologies.orient.core.record.impl.ODocument.toStream(ODocument.java:664)
    at com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx.executeSaveRecord(ODatabaseDocumentTx.java:2183)
    at com.orientechnologies.orient.core.tx.OTransactionNoTx.saveRecord(OTransactionNoTx.java:191)
    at com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx.save(ODatabaseDocumentTx.java:2758)
    at com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx.save(ODatabaseDocumentTx.java:102)
    at com.orientechnologies.orient.core.record.impl.ODocument.save(ODocument.java:1805)
    at com.orientechnologies.orient.core.record.impl.ODocument.save(ODocument.java:1801)
    at com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx.addEdgeInternal(OrientGraphNoTx.java:242)
    at com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx.addEdgeInternal(OrientGraphNoTx.java:137)
    at com.tinkerpop.blueprints.impls.orient.OrientVertex.addEdge(OrientVertex.java:741)
    at com.tinkerpop.blueprints.impls.orient.OrientVertex.addEdge(OrientVertex.java:688)
    at com.orientechnologies.orient.etl.transformer.OEdgeTransformer.createEdge(OEdgeTransformer.java:203)
    at com.orientechnologies.orient.etl.transformer.OEdgeTransformer.executeTransform(OEdgeTransformer.java:123)
    at com.orientechnologies.orient.etl.transformer.OAbstractTransformer.transform(OAbstractTransformer.java:39)
    at com.orientechnologies.orient.etl.OETLPipeline.execute(OETLPipeline.java:110)
    at com.orientechnologies.orient.etl.OETLProcessor$OETLPipelineWorker.call(OETLProcessor.java:620)
    at com.orientechnologies.orient.etl.OETLProcessor$OETLPipelineWorker.call(OETLProcessor.java:601)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

更新 我已经将提取器从 DB 更改为 CSV,以便更容易重现。

创建架构:

CREATE class User IF NOT EXISTS extends V;
create property User.user_id IF NOT EXISTS String;
create property User.name IF NOT EXISTS String;
create index user_idx on User(user_id) unique;

insert into User set user_id = '1000_USER1', name = 'Bob';
insert into User set user_id = '1001_USER2', name = 'Robert';

CSV 样本:

user_id, ivited_by, invited_date
1001_USER2, 1000_USER1,

经过一番努力,重新阅读了整个 ETL 文档并进行了一些调试,我明白了。

我们需要使用 MERG 转换器而不是 VERTEX。合并转换器将查找 Vertex 而不是创建它。

这是我的json样子

"transformers" : [
    { "merge": { "joinFieldName": "user_id", "lookup": "User.user_id" } },
    { "edge": { "class": "INVITED", "direction" : "out", 
            "joinFieldName": "invited_by",
            "lookup": "SELECT expand(u) from (match {class: User, as: u} return u) where u.user_id = ?", 
            "unresolvedLinkAction":"NOTHING",
            "edgeFields": { "invited_date": "${input.invited_date}" },
            "skipDuplicates": true
           } 
    },
    { "field": 
      { "fieldNames": 
        [ "invited_by", "invited_date"], 
        "operation": "remove"
      } 
  }
  ]

我还有一个问题,但我会把它作为一个单独的事情来研究它。问题是它在相同的两个顶点之间创建了重复的边

我会把它作为一个单独的问题来解决。

我一直观察到 OrientDB 的一件事是,东西在那里,但很难弄清楚。