通过 Py2neo 在 Neo4j 中建立关系非常慢
Building relationships in Neo4j via Py2neo is very slow
我们在数据库中有 5 种不同类型的节点。最大的有~290k,最小的只有~3k。每个节点类型都有一个 id 字段,它们都被索引了。我正在使用 py2neo
建立关系,但速度很慢(每秒插入约 2 个关系)
我使用 pandas
从关系 csv 中读取,迭代每一行以创建包含在事务中的关系。我尝试在一个事务中批量处理 10k 个创建语句,但速度似乎并没有提高很多。
代码如下:
df = pd.read_csv(r"C:\relationship.csv",dtype = datatype, skipinitialspace=True, usecols=fields)
df.fillna('',inplace=True)
def f(node_1 ,rel_type, node_2):
try:
tx = graph.begin()
tx.evaluate('MATCH (a {node_id:$label1}),(b {node_id:$label2}) MERGE (a)-[r:'+rel_type+']->(b)',
parameters = {'label1': node_1, 'label2': node_2})
tx.commit()
except Exception as e:
print(str(e))
for index, row in df.iterrows():
if(index%1000000 == 0):
print(index)
try:
f(row["node_1"],row["rel_type"],row["node_2"])
except:
print("error index: " + index)
谁能帮我看看我做错了什么。谢谢!
您声明有“5 种不同类型的节点”(我将其解释为 5 个节点 标签 ,在 neo4j 术语中)。此外,您还声明它们的 id
属性已编入索引。
但是您的 f()
函数根本没有生成使用标签的 Cypher 查询,它也不使用 id
属性。为了利用您的索引,您的 Cypher 查询必须指定节点标签和 id
值。
由于目前在执行 MATCH
时没有有效的方法来参数化标签,f()
函数的以下版本生成一个 Cypher 查询,该查询具有硬编码标签(以及硬编码关系类型):
def f(label_1, id_1, rel_type, label_2, id_2):
try:
tx = graph.begin()
tx.evaluate(
'MATCH' +
'(a:' + label_1 + '{id:$id1}),' +
'(b:' + label_2 + '{id:$id2}) ' +
'MERGE (a)-[r:'+rel_type+']->(b)',
parameters = {'id1': id_1, 'id2': id_2})
tx.commit()
except Exception as e:
print(str(e))
调用 f()
的代码也必须更改为传入标签名称和 a
和 b
的 id
值。希望您的 df
行将包含该数据(或足够的信息供您导出该数据)。
如果您的目标是获得更好的性能,那么您将需要考虑一种不同的加载模式,即批处理。您目前 运行 每个关系都有一个 Cypher MERGE 语句,并将其包装在单独的函数调用中的自己的事务中。
通过查看每个事务或每个函数调用的多个语句来进行批处理将减少网络跃点数并提高性能。
我们在数据库中有 5 种不同类型的节点。最大的有~290k,最小的只有~3k。每个节点类型都有一个 id 字段,它们都被索引了。我正在使用 py2neo
建立关系,但速度很慢(每秒插入约 2 个关系)
我使用 pandas
从关系 csv 中读取,迭代每一行以创建包含在事务中的关系。我尝试在一个事务中批量处理 10k 个创建语句,但速度似乎并没有提高很多。
代码如下:
df = pd.read_csv(r"C:\relationship.csv",dtype = datatype, skipinitialspace=True, usecols=fields)
df.fillna('',inplace=True)
def f(node_1 ,rel_type, node_2):
try:
tx = graph.begin()
tx.evaluate('MATCH (a {node_id:$label1}),(b {node_id:$label2}) MERGE (a)-[r:'+rel_type+']->(b)',
parameters = {'label1': node_1, 'label2': node_2})
tx.commit()
except Exception as e:
print(str(e))
for index, row in df.iterrows():
if(index%1000000 == 0):
print(index)
try:
f(row["node_1"],row["rel_type"],row["node_2"])
except:
print("error index: " + index)
谁能帮我看看我做错了什么。谢谢!
您声明有“5 种不同类型的节点”(我将其解释为 5 个节点 标签 ,在 neo4j 术语中)。此外,您还声明它们的 id
属性已编入索引。
但是您的 f()
函数根本没有生成使用标签的 Cypher 查询,它也不使用 id
属性。为了利用您的索引,您的 Cypher 查询必须指定节点标签和 id
值。
由于目前在执行 MATCH
时没有有效的方法来参数化标签,f()
函数的以下版本生成一个 Cypher 查询,该查询具有硬编码标签(以及硬编码关系类型):
def f(label_1, id_1, rel_type, label_2, id_2):
try:
tx = graph.begin()
tx.evaluate(
'MATCH' +
'(a:' + label_1 + '{id:$id1}),' +
'(b:' + label_2 + '{id:$id2}) ' +
'MERGE (a)-[r:'+rel_type+']->(b)',
parameters = {'id1': id_1, 'id2': id_2})
tx.commit()
except Exception as e:
print(str(e))
调用 f()
的代码也必须更改为传入标签名称和 a
和 b
的 id
值。希望您的 df
行将包含该数据(或足够的信息供您导出该数据)。
如果您的目标是获得更好的性能,那么您将需要考虑一种不同的加载模式,即批处理。您目前 运行 每个关系都有一个 Cypher MERGE 语句,并将其包装在单独的函数调用中的自己的事务中。
通过查看每个事务或每个函数调用的多个语句来进行批处理将减少网络跃点数并提高性能。