使用py2neo上传数据的最佳方式
Best way to upload data using py2neo
我尝试过使用 py2neo 上传中型数据集的方法。在我的例子中,每天大约有 8 万个节点和 40 万个边需要加载。我想分享一下我的经验,问问社区还有没有更好的方法我没有遇到。
一个。 py2neo 的“本机”命令。
使用 graph.merge_one()
创建节点并使用 push()
设置属性。
我很快就忽略了它,因为它非常慢,甚至在几分钟内都不会超过 10 K 条记录。毫不奇怪,py2neo' documentation 和这里的一些帖子推荐使用 Cypher。
乙。没有分区的密码
在循环中使用py2neo.cypher.CypherTransaction
append()
,在结束时使用commit()
。
# query sent to MSSQL. Returns ~ 80K records
result = engine.execute(query)
statement = "MERGE (e:Entity {myid: {ID}}) SET e.p = 1"
# begin new Cypher transaction
tx = neoGraph.cypher.begin()
for row in result:
tx.append(statement, {"ID": row.id_field})
tx.commit()
这会超时并导致 Neo4j 服务器崩溃。
我知道问题是所有 80 K Cypher 语句都试图一次性执行。
C。带分区和一次提交的 Cypher
我使用计数器和 process()
命令一次 运行 1000 个语句。
# query sent to MSSQL. Returns ~ 80K records
result = engine.execute(query)
statement = "MERGE (e:Entity {myid: {ID}}) SET e.p = 1"
counter = 0
tx = neoGraph.cypher.begin()
for row in result:
counter += 1
tx.append(statement, {"ID": row.id_field})
if (counter == 1000):
tx.process() # process 1000 statements
counter = 0
tx.commit()
这 运行 一开始速度很快,但随着处理 1000 笔交易而变慢。最终,它在堆栈溢出中超时。
这令人惊讶,因为我预计 process()
每次都会重置堆栈。
D.带分区的 Cypher 和每个分区的提交
这是唯一运行良好的版本。对 1000 个事务的每个分区执行 commit()
并使用 begin()
.
重新开始一个新事务
# query sent to MSSQL. Returns ~ 80K records
result = engine.execute(query)
statement = "MERGE (e:Entity {myid: {ID}}) SET e.p = 1"
counter = 0
tx = neoGraph.cypher.begin()
for row in result:
counter += 1
tx.append(statement, {"ID": row.id_field})
if (counter == 1000):
tx.commit() # commit 1000 statements
tx = neoGraph.cypher.begin() # reopen transaction
counter = 0
tx.commit()
这 运行 又快又好。
有意见吗?
正如您通过反复试验发现的那样,当单个事务的操作不超过大约 10K-50K 时,它的性能最佳。您在 D 中描述的方法效果最好,因为您每 1000 个语句提交一次事务。您或许可以安全地增加批量大小。
您可能想尝试的另一种方法是将一组值作为参数传递,并使用 Cypher 的 UNWIND
命令迭代它们。例如:
WITH {id_array} AS ids // something like [1,2,3,4,5,6]
UNWIND ids AS ident
MERGE (e:Entity {myid: ident})
SET e.p = 1
我尝试过使用 py2neo 上传中型数据集的方法。在我的例子中,每天大约有 8 万个节点和 40 万个边需要加载。我想分享一下我的经验,问问社区还有没有更好的方法我没有遇到。
一个。 py2neo 的“本机”命令。
使用 graph.merge_one()
创建节点并使用 push()
设置属性。
我很快就忽略了它,因为它非常慢,甚至在几分钟内都不会超过 10 K 条记录。毫不奇怪,py2neo' documentation 和这里的一些帖子推荐使用 Cypher。
乙。没有分区的密码
在循环中使用py2neo.cypher.CypherTransaction
append()
,在结束时使用commit()
。
# query sent to MSSQL. Returns ~ 80K records
result = engine.execute(query)
statement = "MERGE (e:Entity {myid: {ID}}) SET e.p = 1"
# begin new Cypher transaction
tx = neoGraph.cypher.begin()
for row in result:
tx.append(statement, {"ID": row.id_field})
tx.commit()
这会超时并导致 Neo4j 服务器崩溃。 我知道问题是所有 80 K Cypher 语句都试图一次性执行。
C。带分区和一次提交的 Cypher
我使用计数器和 process()
命令一次 运行 1000 个语句。
# query sent to MSSQL. Returns ~ 80K records
result = engine.execute(query)
statement = "MERGE (e:Entity {myid: {ID}}) SET e.p = 1"
counter = 0
tx = neoGraph.cypher.begin()
for row in result:
counter += 1
tx.append(statement, {"ID": row.id_field})
if (counter == 1000):
tx.process() # process 1000 statements
counter = 0
tx.commit()
这 运行 一开始速度很快,但随着处理 1000 笔交易而变慢。最终,它在堆栈溢出中超时。
这令人惊讶,因为我预计 process()
每次都会重置堆栈。
D.带分区的 Cypher 和每个分区的提交
这是唯一运行良好的版本。对 1000 个事务的每个分区执行 commit()
并使用 begin()
.
# query sent to MSSQL. Returns ~ 80K records
result = engine.execute(query)
statement = "MERGE (e:Entity {myid: {ID}}) SET e.p = 1"
counter = 0
tx = neoGraph.cypher.begin()
for row in result:
counter += 1
tx.append(statement, {"ID": row.id_field})
if (counter == 1000):
tx.commit() # commit 1000 statements
tx = neoGraph.cypher.begin() # reopen transaction
counter = 0
tx.commit()
这 运行 又快又好。
有意见吗?
正如您通过反复试验发现的那样,当单个事务的操作不超过大约 10K-50K 时,它的性能最佳。您在 D 中描述的方法效果最好,因为您每 1000 个语句提交一次事务。您或许可以安全地增加批量大小。
您可能想尝试的另一种方法是将一组值作为参数传递,并使用 Cypher 的 UNWIND
命令迭代它们。例如:
WITH {id_array} AS ids // something like [1,2,3,4,5,6]
UNWIND ids AS ident
MERGE (e:Entity {myid: ident})
SET e.p = 1