使用 apoc.periodic.commit 向 neo4j 中插入无限的 json 流
Using apoc.periodic.commit to insert an endless json stream into neo4j
我是 NEO4J 的新手,正在尝试将 JSON 流中的数据插入数据库。 JSON流的根元素是一个数组,数组中的每个元素都是一个对象,包含一个key/value和一个数组
JSON 流的示例:
[
{
"access_point":4864834,
"objects": [
{"class_id":10, "name":"iphone", "snr":0.557461},
{"class_id":7, "name":"android", "snr":0.822390},
{"class_id":7, "name":"android", "snr":0.320850},
{"class_id":2, "name":"pc", "snr":0.915604}
]
},
{
"access_point":4864835,
"objects": [
{"class_id":12, "name":"iphone", "snr":0.268736},
{"class_id":10, "name":"android", "snr":0.585927},
{"class_id":7, "name":"android", "snr":0.821383},
{"class_id":2, "name":"pc", "snr":0.254997},
{"class_id":7, "name":"android", "snr":0.326559},
{"class_id":2, "name":"pc", "snr":0.905473}
]
},
因为它是一个无穷无尽的流,我需要做批量提交,因为 apoc.load.json 永远不会到达数组的末尾。
到目前为止我的查询看起来像:
CALL apoc.periodic.commit("
CALL apoc.load.json('https://raw.githubusercontent.com/jdharri/testjson/master/test.json','$[*]')
YIELD value as accesspoint MERGE(f:Accesspoint {id: accesspoint.access_point, name: accesspoint.access_point})
FOREACH(object IN accesspoint.objects | MERGE (f)-[r:OBSERVED]->(:Object {class_id:object.class_id, name:object.name, access_point_id:accesspoint.access_point}))",
{limit:10, batchSize: 10});
这当然不是在引用 JSON 流,而是在我的 github.
中引用静态 JSON
有没有办法告诉它在数组中的 n 个元素之后持续存在?
看起来您应该使用 apoc.periodic.iterate 而不是 apoc.periodic.commit
。例如:
CALL apoc.periodic.iterate(
"CALL apoc.load.json('https://raw.githubusercontent.com/jdharri/testjson/master/test.json','$[*]')
YIELD value AS ap",
"MERGE(f:Accesspoint {id: ap.access_point, name: ap.access_point})
FOREACH(obj IN ap.objects |
MERGE (f)-[r:OBSERVED]->(:Object {class_id:obj.class_id, name:obj.name, access_point_id:ap.access_point}))",
{batchSize: 10});
apoc.periodic.iterate
被记录为支持 batchSize
选项,该选项在单个事务中处理第二个 Cypher 语句的 N 次执行。
因为我可以访问数据源,所以我们能够修改它输出 JSON 的方式。我们将其切换为 JSONL(行划定 JSON),其中 JSON 的每一行基本上都被视为它自己的 JSON 文档。我确实利用了很多@cybersam 的回答,还有 Michael Hunger,所以谢谢。
将源 JSON 更改为 JSONL,如下所示:
{"access_point":4864834, "objects": [{"class_id":10, "name":"iphone", "snr":0.557461}, {"class_id":7, "name":"android", "snr":0.822390}, {"class_id":7, "name":"android", "snr":0.320850}, {"class_id":2, "name":"pc", "snr":0.915604}]}
{"access_point":4864835, "objects": [{"class_id":12, "name":"iphone", "snr":0.268736}, {"class_id":10, "name":"android", "snr":0.585927}, {"class_id":7, "name":"android", "snr":0.821383}]}
我的 neo4j 密码查询如下所示:
CALL apoc.periodic.iterate(
"CALL apoc.load.jsonArray('http://13.68.174.185:8899/',null)
YIELD value AS ap",
MERGE(f:AccessPoint {id: ap.frame_id, name: ap.access_point_id})
FOREACH(obj IN frames.objects |
MERGE (f)-[r:OBSERVED]->(:Object {class_id:obj.class_id, name:obj.name, access_point_id:ap.ap_id}))",
{batchSize: 1});
我是 NEO4J 的新手,正在尝试将 JSON 流中的数据插入数据库。 JSON流的根元素是一个数组,数组中的每个元素都是一个对象,包含一个key/value和一个数组
JSON 流的示例:
[
{
"access_point":4864834,
"objects": [
{"class_id":10, "name":"iphone", "snr":0.557461},
{"class_id":7, "name":"android", "snr":0.822390},
{"class_id":7, "name":"android", "snr":0.320850},
{"class_id":2, "name":"pc", "snr":0.915604}
]
},
{
"access_point":4864835,
"objects": [
{"class_id":12, "name":"iphone", "snr":0.268736},
{"class_id":10, "name":"android", "snr":0.585927},
{"class_id":7, "name":"android", "snr":0.821383},
{"class_id":2, "name":"pc", "snr":0.254997},
{"class_id":7, "name":"android", "snr":0.326559},
{"class_id":2, "name":"pc", "snr":0.905473}
]
},
因为它是一个无穷无尽的流,我需要做批量提交,因为 apoc.load.json 永远不会到达数组的末尾。
到目前为止我的查询看起来像:
CALL apoc.periodic.commit("
CALL apoc.load.json('https://raw.githubusercontent.com/jdharri/testjson/master/test.json','$[*]')
YIELD value as accesspoint MERGE(f:Accesspoint {id: accesspoint.access_point, name: accesspoint.access_point})
FOREACH(object IN accesspoint.objects | MERGE (f)-[r:OBSERVED]->(:Object {class_id:object.class_id, name:object.name, access_point_id:accesspoint.access_point}))",
{limit:10, batchSize: 10});
这当然不是在引用 JSON 流,而是在我的 github.
中引用静态 JSON有没有办法告诉它在数组中的 n 个元素之后持续存在?
看起来您应该使用 apoc.periodic.iterate 而不是 apoc.periodic.commit
。例如:
CALL apoc.periodic.iterate(
"CALL apoc.load.json('https://raw.githubusercontent.com/jdharri/testjson/master/test.json','$[*]')
YIELD value AS ap",
"MERGE(f:Accesspoint {id: ap.access_point, name: ap.access_point})
FOREACH(obj IN ap.objects |
MERGE (f)-[r:OBSERVED]->(:Object {class_id:obj.class_id, name:obj.name, access_point_id:ap.access_point}))",
{batchSize: 10});
apoc.periodic.iterate
被记录为支持 batchSize
选项,该选项在单个事务中处理第二个 Cypher 语句的 N 次执行。
因为我可以访问数据源,所以我们能够修改它输出 JSON 的方式。我们将其切换为 JSONL(行划定 JSON),其中 JSON 的每一行基本上都被视为它自己的 JSON 文档。我确实利用了很多@cybersam 的回答,还有 Michael Hunger,所以谢谢。
将源 JSON 更改为 JSONL,如下所示:
{"access_point":4864834, "objects": [{"class_id":10, "name":"iphone", "snr":0.557461}, {"class_id":7, "name":"android", "snr":0.822390}, {"class_id":7, "name":"android", "snr":0.320850}, {"class_id":2, "name":"pc", "snr":0.915604}]}
{"access_point":4864835, "objects": [{"class_id":12, "name":"iphone", "snr":0.268736}, {"class_id":10, "name":"android", "snr":0.585927}, {"class_id":7, "name":"android", "snr":0.821383}]}
我的 neo4j 密码查询如下所示:
CALL apoc.periodic.iterate(
"CALL apoc.load.jsonArray('http://13.68.174.185:8899/',null)
YIELD value AS ap",
MERGE(f:AccessPoint {id: ap.frame_id, name: ap.access_point_id})
FOREACH(obj IN frames.objects |
MERGE (f)-[r:OBSERVED]->(:Object {class_id:obj.class_id, name:obj.name, access_point_id:ap.ap_id}))",
{batchSize: 1});