使用 apoc.periodic.commit 向 neo4j 中插入无限的 json 流

Using apoc.periodic.commit to insert an endless json stream into neo4j

我是 NEO4J 的新手,正在尝试将 JSON 流中的数据插入数据库。 JSON流的根元素是一个数组,数组中的每个元素都是一个对象,包含一个key/value和一个数组

JSON 流的示例:

[
{
 "access_point":4864834, 
 "objects": [ 
  {"class_id":10, "name":"iphone", "snr":0.557461}, 
  {"class_id":7, "name":"android", "snr":0.822390}, 
  {"class_id":7, "name":"android", "snr":0.320850}, 
  {"class_id":2, "name":"pc", "snr":0.915604}
 ] 
}, 
{
 "access_point":4864835, 
 "objects": [ 
  {"class_id":12, "name":"iphone", "snr":0.268736}, 
  {"class_id":10, "name":"android", "snr":0.585927}, 
  {"class_id":7, "name":"android", "snr":0.821383}, 
  {"class_id":2, "name":"pc", "snr":0.254997}, 
  {"class_id":7, "name":"android", "snr":0.326559}, 
  {"class_id":2, "name":"pc", "snr":0.905473}
 ] 
}, 

因为它是一个无穷无尽的流,我需要做批量提交,因为 apoc.load.json 永远不会到达数组的末尾。

到目前为止我的查询看起来像:

CALL apoc.periodic.commit("
CALL apoc.load.json('https://raw.githubusercontent.com/jdharri/testjson/master/test.json','$[*]')
YIELD value as accesspoint MERGE(f:Accesspoint {id: accesspoint.access_point, name: accesspoint.access_point})
FOREACH(object IN accesspoint.objects | MERGE (f)-[r:OBSERVED]->(:Object {class_id:object.class_id, name:object.name, access_point_id:accesspoint.access_point}))",
{limit:10, batchSize: 10});

这当然不是在引用 JSON 流,而是在我的 github.

中引用静态 JSON

有没有办法告诉它在数组中的 n 个元素之后持续存在?

看起来您应该使用 apoc.periodic.iterate 而不是 apoc.periodic.commit。例如:

CALL apoc.periodic.iterate(
  "CALL apoc.load.json('https://raw.githubusercontent.com/jdharri/testjson/master/test.json','$[*]')
YIELD value AS ap",
  "MERGE(f:Accesspoint {id: ap.access_point, name: ap.access_point})
   FOREACH(obj IN ap.objects |
     MERGE (f)-[r:OBSERVED]->(:Object {class_id:obj.class_id, name:obj.name, access_point_id:ap.access_point}))",
{batchSize: 10});

apoc.periodic.iterate 被记录为支持 batchSize 选项,该选项在单个事务中处理第二个 Cypher 语句的 N 次执行。

因为我可以访问数据源,所以我们能够修改它输出 JSON 的方式。我们将其切换为 JSONL(行划定 JSON),其中 JSON 的每一行基本上都被视为它自己的 JSON 文档。我确实利用了很多@cybersam 的回答,还有 Michael Hunger,所以谢谢。

将源 JSON 更改为 JSONL,如下所示:


{"access_point":4864834, "objects": [{"class_id":10, "name":"iphone", "snr":0.557461}, {"class_id":7, "name":"android", "snr":0.822390}, {"class_id":7, "name":"android", "snr":0.320850}, {"class_id":2, "name":"pc", "snr":0.915604}]}
{"access_point":4864835, "objects": [{"class_id":12, "name":"iphone", "snr":0.268736}, {"class_id":10, "name":"android", "snr":0.585927}, {"class_id":7, "name":"android", "snr":0.821383}]}

我的 neo4j 密码查询如下所示:

CALL apoc.periodic.iterate(
"CALL apoc.load.jsonArray('http://13.68.174.185:8899/',null)
YIELD value AS ap",
MERGE(f:AccessPoint {id: ap.frame_id, name: ap.access_point_id})
FOREACH(obj IN frames.objects |
  MERGE (f)-[r:OBSERVED]->(:Object {class_id:obj.class_id, name:obj.name, access_point_id:ap.ap_id}))",
{batchSize: 1});