使用 python 将数据导入 Neo4j 时挂起
Hangs while importing data into Neo4j with python
我正在尝试将 lastfm360K 数据库导入 Neo4j 数据库。我首先将所有用户作为节点插入,下面的代码没有任何问题
import re
from datetime import datetime
from elasticsearch import Elasticsearch
import certifi
from neo4j.v1 import GraphDatabase, basic_auth
driver = GraphDatabase.driver("bolt://localhost", auth=basic_auth("neo4j", "pass"))
session = driver.session()
with open("/Users/inanc/Documents/Software/lastfm-dataset-360K/usersha1-profile.tsv" , 'r') as userFile:
#first_line = userFile.readline()
linenum = 0
for line in userFile:
linenum = linenum + 1
if linenum % 1000 == 0:
print(linenum)
lineStrip = line.rstrip().split("\t")
tempDict = {}
tempDict["user_id"] = lineStrip[0]
if len(lineStrip) > 1:
tempDict["gender"] = lineStrip[1]
if lineStrip[2] != "":
tempDict["age"] = int(lineStrip[2])
tempDict["country"] = lineStrip[3]
tempDict["signup"] = lineStrip[4]
session.run("CREATE (a:Person {dict})", {"dict": tempDict})
session.close()
然后我想添加艺术家节点和与用户的关系如下
import re
from datetime import datetime
from elasticsearch import Elasticsearch
import certifi
from neo4j.v1 import GraphDatabase, basic_auth
driver = GraphDatabase.driver("bolt://localhost", auth=basic_auth("neo4j", "pass"))
session = driver.session()
linenum = 0
with open("/Users/inanc/Documents/Software/lastfm-dataset-360K/usersha1-artmbid-artname-plays.tsv" , 'r') as songFile:
for line in songFile:
linenum = linenum + 1
if linenum % 10000 == 0:
print(linenum)
lineStrip = line.rstrip().split("\t")
if len(lineStrip) == 4:
#print(line)
user_id = lineStrip[0]
musicbrainz_artistid = lineStrip[1]
artist_name = lineStrip[2]
plays = 1
if lineStrip[3] != "":
plays = int(lineStrip[3])
session.run("MERGE (a:Artist {artist_name: {artist_name}})", {"artist_name": artist_name})
session.run("MATCH (p:Person {user_id: {user_id}}), (a:Artist {artist_name: {artist_name}}) CREATE (p)-[:LIKES {times: {plays}}]->(a)", {"user_id": user_id, "artist_name": artist_name, "plays": plays})
session.close()
它开始执行时没有任何错误(顺便说一句,它非常慢,花了几个小时),但过了一段时间它在某个点挂起(例如在几百万行之后)。即使我的 python 脚本挂起,我仍然可以通过浏览器查询。
我唯一的限制是
create constraint on (p:Person) assert p.user_id is unique;
create constraint on (a:Artist) assert a.artist_name is unique;
我在配备 8GB 内存的 Macbook 上使用 neo4j 3.0.7。我也在使用官方支持的 python driver by neo4j.
任何帮助将不胜感激!
每次调用 session.run()
都会执行以下操作:
- 开始交易。
- 执行传递给
run()
的 Cypher 语句。
- 提交(或回滚)事务。
在您的情况下,您实际上是在每个输入行进行 两次 session.run()
调用。不仅如此,第二次调用中的 Cypher 语句必须 MATCH
Cypher 在第一次调用中获得的 Artist
节点。
由于您的输入文件有 1750 万行,这意味着您有 creating/committing/closing 3500 万笔交易。此外,您正在执行 1750 万次不必要的 MATCH
操作。这 非常 昂贵,并且有时还可能导致 driver 绊倒。
建议:
您应该在同一事务中对多个操作进行批处理。例如,如果您在每个事务中批处理 10K 个操作,则 1750 万个输入行只需要 1750 个事务。
您应该将两个 Cypher 语句合并为一个。
例如,如果您将代码更改为:
,您应该会得到更好的结果
为每批 10K 个元素生成一个 list
数组参数,它(如果打印得很好)看起来像这样:
{"list":
[
{"id": 1, "name": 'aaa', "plays": 3},
{"id": 2, "name": 'bbb', "plays": 2},
{"id": 3, "name": 'ccc', "plays": 3},
...
{"id": 10000, "name": 'xyz', "plays": 7}
]
}
使用以下 Cypher 语句:
UNWIND {list} AS d
MATCH (p:Person {user_id: d.id})
MERGE (a:Artist {artist_name: d.name})
MERGE (p)-[:LIKES {times: d.plays}]->(a)
使用上述 Cypher 语句和参数调用 session.run()
(每 10K 输入行一次)。
我正在尝试将 lastfm360K 数据库导入 Neo4j 数据库。我首先将所有用户作为节点插入,下面的代码没有任何问题
import re
from datetime import datetime
from elasticsearch import Elasticsearch
import certifi
from neo4j.v1 import GraphDatabase, basic_auth
driver = GraphDatabase.driver("bolt://localhost", auth=basic_auth("neo4j", "pass"))
session = driver.session()
with open("/Users/inanc/Documents/Software/lastfm-dataset-360K/usersha1-profile.tsv" , 'r') as userFile:
#first_line = userFile.readline()
linenum = 0
for line in userFile:
linenum = linenum + 1
if linenum % 1000 == 0:
print(linenum)
lineStrip = line.rstrip().split("\t")
tempDict = {}
tempDict["user_id"] = lineStrip[0]
if len(lineStrip) > 1:
tempDict["gender"] = lineStrip[1]
if lineStrip[2] != "":
tempDict["age"] = int(lineStrip[2])
tempDict["country"] = lineStrip[3]
tempDict["signup"] = lineStrip[4]
session.run("CREATE (a:Person {dict})", {"dict": tempDict})
session.close()
然后我想添加艺术家节点和与用户的关系如下
import re
from datetime import datetime
from elasticsearch import Elasticsearch
import certifi
from neo4j.v1 import GraphDatabase, basic_auth
driver = GraphDatabase.driver("bolt://localhost", auth=basic_auth("neo4j", "pass"))
session = driver.session()
linenum = 0
with open("/Users/inanc/Documents/Software/lastfm-dataset-360K/usersha1-artmbid-artname-plays.tsv" , 'r') as songFile:
for line in songFile:
linenum = linenum + 1
if linenum % 10000 == 0:
print(linenum)
lineStrip = line.rstrip().split("\t")
if len(lineStrip) == 4:
#print(line)
user_id = lineStrip[0]
musicbrainz_artistid = lineStrip[1]
artist_name = lineStrip[2]
plays = 1
if lineStrip[3] != "":
plays = int(lineStrip[3])
session.run("MERGE (a:Artist {artist_name: {artist_name}})", {"artist_name": artist_name})
session.run("MATCH (p:Person {user_id: {user_id}}), (a:Artist {artist_name: {artist_name}}) CREATE (p)-[:LIKES {times: {plays}}]->(a)", {"user_id": user_id, "artist_name": artist_name, "plays": plays})
session.close()
它开始执行时没有任何错误(顺便说一句,它非常慢,花了几个小时),但过了一段时间它在某个点挂起(例如在几百万行之后)。即使我的 python 脚本挂起,我仍然可以通过浏览器查询。
我唯一的限制是
create constraint on (p:Person) assert p.user_id is unique;
create constraint on (a:Artist) assert a.artist_name is unique;
我在配备 8GB 内存的 Macbook 上使用 neo4j 3.0.7。我也在使用官方支持的 python driver by neo4j.
任何帮助将不胜感激!
每次调用 session.run()
都会执行以下操作:
- 开始交易。
- 执行传递给
run()
的 Cypher 语句。 - 提交(或回滚)事务。
在您的情况下,您实际上是在每个输入行进行 两次 session.run()
调用。不仅如此,第二次调用中的 Cypher 语句必须 MATCH
Cypher 在第一次调用中获得的 Artist
节点。
由于您的输入文件有 1750 万行,这意味着您有 creating/committing/closing 3500 万笔交易。此外,您正在执行 1750 万次不必要的 MATCH
操作。这 非常 昂贵,并且有时还可能导致 driver 绊倒。
建议:
您应该在同一事务中对多个操作进行批处理。例如,如果您在每个事务中批处理 10K 个操作,则 1750 万个输入行只需要 1750 个事务。
您应该将两个 Cypher 语句合并为一个。
例如,如果您将代码更改为:
,您应该会得到更好的结果为每批 10K 个元素生成一个
list
数组参数,它(如果打印得很好)看起来像这样:{"list": [ {"id": 1, "name": 'aaa', "plays": 3}, {"id": 2, "name": 'bbb', "plays": 2}, {"id": 3, "name": 'ccc', "plays": 3}, ... {"id": 10000, "name": 'xyz', "plays": 7} ] }
使用以下 Cypher 语句:
UNWIND {list} AS d MATCH (p:Person {user_id: d.id}) MERGE (a:Artist {artist_name: d.name}) MERGE (p)-[:LIKES {times: d.plays}]->(a)
使用上述 Cypher 语句和参数调用
session.run()
(每 10K 输入行一次)。