etl 和多对多关系

etl and many to many relation

在文档中,来自 csv 的 etl 使用一对多功能,我想将它扩展到多对多。所以我做了 3 个配置,一个用于 post,一个用于评论,一个用于关系。 Post 和 Comment 都可以,但是当我启动关系时出现此错误,我做错了什么?

    commentId,postId
    0,10
    1,10
    21,10
    41,20
    82,20    

    {
     "source": { "file": { "path": "/tmp/relation.csv" } },
     "extractor": { "csv": {} },
       "transformers": [
        { "edge":
            { "class": "HasComments", "joinFieldName": "postId", "lookup": "Post.id", "direction": "out"},
            { "class": "HasComments", "joinFieldName": "commentId", "lookup": "Comment.id", "direction": "in"}
        }
      ],
      "loader": {
        "orientdb": {
           "dbURL": "plocal:/tmp/test",
           "dbType": "graph",
           "classes": [
             {"name": "Post", "extends": "V"},
             {"name": "Comment", "extends": "V"},
             {"name": "HasComments", "extends": "E"}
           ],
           "indexes": [
             {"class":"Post", "fields":["id:integer"], "type":"UNIQUE" },
             {"class":"Comment", "fields":["id:integer"], "type":"UNIQUE" }
           ]
        }
      }
    }

    OrientDB etl v.2.1.9-SNAPSHOT (build 2.1.x@r; 2016-01-07 10:51:24+0000) www.orientdb.com
    BEGIN ETL PROCESSOR
    [file] INFO Reading from file /tmp/relation.csv with encoding UTF-8
    Error in Pipeline execution: com.orientechnologies.orient.etl.transformer.OTransformException: edge: input type 'com.orientechnologies.orient.core.record.impl.ODocument@72ade7e3' is not supported
    ETL process halted: com.orientechnologies.orient.etl.OETLProcessHaltedException: Halt
    Exception in thread "main" com.orientechnologies.orient.etl.OETLProcessHaltedException: Halt
    at com.orientechnologies.orient.etl.OETLPipeline.execute(OETLPipeline.java:149)
    at com.orientechnologies.orient.etl.OETLProcessor.executeSequentially(OETLProcessor.java:448)
    at com.orientechnologies.orient.etl.OETLProcessor.execute(OETLProcessor.java:255)
    at com.orientechnologies.orient.etl.OETLProcessor.main(OETLProcessor.java:109)
    Caused by: com.orientechnologies.orient.etl.transformer.OTransformException: edge: input type 'com.orientechnologies.orient.core.record.impl.ODocument@72ade7e3' is not supported
    at com.orientechnologies.orient.etl.transformer.OEdgeTransformer.executeTransform(OEdgeTransformer.java:107)
    at com.orientechnologies.orient.etl.transformer.OAbstractTransformer.transform(OAbstractTransformer.java:37)
    at com.orientechnologies.orient.etl.OETLPipeline.execute(OETLPipeline.java:115)
    ... 3 more

一个可能的解决方案是,在通过 json 文件导入 Post 和 Comment classes 之后,您可以使用另一个 json 文件并导入 class 关系

{
    "source": { "file": { "path": "/tmp/relation.csv" } },
    "extractor": { "row": {} },
    "transformers": [
        { "csv": { "separator": ","} 
        },
        { "vertex": { "class": "Relation" } }
    ],
    "loader": {
        "orientdb": {
           "dbURL": "plocal:/tmp/test",
           "dbType": "graph",
            "classes": [
                {"name": "Post", "extends": "V"},
                {"name": "Comment", "extends": "V"},
                {"name": "Relation", "extends": "V"},
                {"name": "HasComments", "extends": "E"}
            ],
            "indexes": [
                {"class":"Post", "fields":["id:integer"], "type":"UNIQUE" },
                {"class":"Comment", "fields":["id:integer"], "type":"UNIQUE" }
            ]
        }
    }
}

您将获得这些记录。

使用以下 javascript 函数

var g=orient.getGraphNoTx();
var relation = g.command("sql","select from Relation");
for(i=0;i<relation.length;i++){
var relationMM=g.command("sql","select postId , commentId from "+ relation[i].getId());
var idPost=relationMM[0].getProperty("postId");
var idComment=relationMM[0].getProperty("commentId");
var post=g.command("sql","select from Post where id = " + idPost);
var comment=g.command("sql","select from Comment where id = " + idComment);
g.command("sql","create edge HasComments from " + post[0].getId() + " to " + comment[0].getId());
}
g.command("sql","drop class Relation unsafe");

您将得到以下结构。

这将是您的图表。

更新

您可以使用此代码检查边是否已存在

var counter=g.command("sql","select count(*) from HasComments where out=" + post[0].getId() + " and in=" + comment[0].getId());
if(counter[0].getProperty("count")==0){
    g.command("sql","create edge HasComments from " + post[0].getId() + " to " + comment[0].getId());
}