批量加载neo4j
Batch loading neo4j
我正在使用此脚本使用 py2neo 批量加载 neo4j 图形:
batch = neo4j.WriteBatch(graph)
counter = 0
for each in ans:
n1 = graph.merge_one("Page", "url", each[0])
# batch.create(n1)
counter +=1
for linkvalue in each[6]:
try:
text,link = linkvalue.split('!__!')
n2 = graph.merge_one("Page", "url", link)
# batch.create(n2)
counter+=1
rel = Relationship(n1,'LINKS',n2, anchor_text=text)
batch.create(rel)
except (KeyboardInterrupt, SystemExit):
print 'fail'
raise
if counter > 900:
counter = 0
batch.submit()
print 'submit'
batch = neo4j.WriteBatch(graph)
merge_one 都调用了图表,我认为这会减慢我的算法。我注释掉了 batch.create() 因为他们正在重新创建节点。有没有办法执行此功能但将其保存到我 batch.submit() 以加快该过程?
我正在处理大约 50,000 个节点和 1,000,000 个关系。
您需要将语句附加到 WriteBatch
,然后 run
批处理达到一定数量的语句。
这是一个例子:
import json
from py2neo.neo4j import CypherQuery, GraphDatabaseService, WriteBatch
from py2neo import neo4j
db = neo4j.GraphDatabaseService()
business_index_query = CypherQuery(db, "CREATE INDEX ON :Business(id)")
business_index_query.execute()
category_index_query = CypherQuery(db, "CREATE INDEX ON :Category(name)")
category_index_query.execute()
create_business_query = '''
CREATE (b:Business {id: {business_id}, name: {name}, lat:{latitude},
lon:{longitude}, stars: {stars}, review_count: {review_count}})
'''
merge_category_query = '''
MATCH (b:Business {id: {business_id}})
MERGE (c:Category {name: {category}})
CREATE UNIQUE (c)<-[:IS_IN]-(b)
'''
print "Beginning business batch"
with open('data/yelp_academic_dataset_business.json', 'r') as f:
business_batch = WriteBatch(db)
count = 0
for b in (json.loads(l) for l in f):
business_batch.append_cypher(create_business_query, b)
count += 1
if count >= 10000:
business_batch.run()
business_batch.clear()
count = 0
if count > 0:
business_batch.run()
print "Beginning category batch"
with open('data/yelp_academic_dataset_business.json', 'r') as f:
category_batch = WriteBatch(db)
count = 0
for b in (json.loads(l) for l in f):
for c in b['categories']:
category_batch.append_cypher(merge_category_query, {'business_id': b['business_id'], 'category': c})
count += 1
if count >= 10000:
category_batch.run()
category_batch.clear()
count = 0
if count > 0:
category_batch.run()
请注意,此示例仅使用 Cypher 语句并将每个语句附加到 WriteBatch
。此示例还使用了两个不同的 WriteBatch
实例。
我正在使用此脚本使用 py2neo 批量加载 neo4j 图形:
batch = neo4j.WriteBatch(graph)
counter = 0
for each in ans:
n1 = graph.merge_one("Page", "url", each[0])
# batch.create(n1)
counter +=1
for linkvalue in each[6]:
try:
text,link = linkvalue.split('!__!')
n2 = graph.merge_one("Page", "url", link)
# batch.create(n2)
counter+=1
rel = Relationship(n1,'LINKS',n2, anchor_text=text)
batch.create(rel)
except (KeyboardInterrupt, SystemExit):
print 'fail'
raise
if counter > 900:
counter = 0
batch.submit()
print 'submit'
batch = neo4j.WriteBatch(graph)
merge_one 都调用了图表,我认为这会减慢我的算法。我注释掉了 batch.create() 因为他们正在重新创建节点。有没有办法执行此功能但将其保存到我 batch.submit() 以加快该过程?
我正在处理大约 50,000 个节点和 1,000,000 个关系。
您需要将语句附加到 WriteBatch
,然后 run
批处理达到一定数量的语句。
这是一个例子:
import json
from py2neo.neo4j import CypherQuery, GraphDatabaseService, WriteBatch
from py2neo import neo4j
db = neo4j.GraphDatabaseService()
business_index_query = CypherQuery(db, "CREATE INDEX ON :Business(id)")
business_index_query.execute()
category_index_query = CypherQuery(db, "CREATE INDEX ON :Category(name)")
category_index_query.execute()
create_business_query = '''
CREATE (b:Business {id: {business_id}, name: {name}, lat:{latitude},
lon:{longitude}, stars: {stars}, review_count: {review_count}})
'''
merge_category_query = '''
MATCH (b:Business {id: {business_id}})
MERGE (c:Category {name: {category}})
CREATE UNIQUE (c)<-[:IS_IN]-(b)
'''
print "Beginning business batch"
with open('data/yelp_academic_dataset_business.json', 'r') as f:
business_batch = WriteBatch(db)
count = 0
for b in (json.loads(l) for l in f):
business_batch.append_cypher(create_business_query, b)
count += 1
if count >= 10000:
business_batch.run()
business_batch.clear()
count = 0
if count > 0:
business_batch.run()
print "Beginning category batch"
with open('data/yelp_academic_dataset_business.json', 'r') as f:
category_batch = WriteBatch(db)
count = 0
for b in (json.loads(l) for l in f):
for c in b['categories']:
category_batch.append_cypher(merge_category_query, {'business_id': b['business_id'], 'category': c})
count += 1
if count >= 10000:
category_batch.run()
category_batch.clear()
count = 0
if count > 0:
category_batch.run()
请注意,此示例仅使用 Cypher 语句并将每个语句附加到 WriteBatch
。此示例还使用了两个不同的 WriteBatch
实例。