在 PYTHON 中使用 DASK 读取文件并写入 NEO4J
Using DASK to read files and write to NEO4J in PYTHON
我在并行化读取一些文件并写入 neo4j 的代码时遇到问题。
- 我正在使用 dask 并行化 process_language_files 函数(从底部数第 3 个单元格)。
- 我试着解释下面的代码,列出函数(前 3 个单元格)。
- 错误打印在最后(最后 2 个单元格)。
- 我还在最后列出了环境和包版本。
如果我依次删除 dask.delayed 和 运行 这段代码,它的效果非常好。
感谢您的帮助。 :)
============================================= =============================
与 neo4j 一起使用的一些函数。
from neo4j import GraphDatabase
from tqdm import tqdm
def get_driver(uri_scheme='bolt', host='localhost', port='7687', username='neo4j', password=''):
"""Get a neo4j driver."""
connection_uri = "{uri_scheme}://{host}:{port}".format(uri_scheme=uri_scheme, host=host, port=port)
auth = (username, password)
driver = GraphDatabase.driver(connection_uri, auth=auth)
return driver
def format_raw_res(raw_res):
"""Parse neo4j results"""
res = []
for r in raw_res:
res.append(r)
return res
def run_bulk_query(query_list, driver):
"""Run a list of neo4j queries in a session."""
results = []
with driver.session() as session:
for query in tqdm(query_list):
raw_res = session.run(query)
res = format_raw_res(raw_res)
results.append({'query':query, 'result':res})
return results
global_driver = get_driver(uri_scheme='bolt', host='localhost', port='8687', username='neo4j', password='abc123') # neo4j driver object.=
这就是我们创建 dask 客户端以进行并行化的方式。
from dask.distributed import Client
client = Client(threads_per_worker=4, n_workers=1)
主代码调用的函数。
import sys
import time
import json
import pandas as pd
import dask
def add_nodes(nodes_list, language_code):
"""Returns a list of strings. Each string is a cypher query to add a node to neo4j."""
list_of_create_strings = []
create_string_template = """CREATE (:LABEL {{node_id:{node_id}}})"""
for index, node in nodes_list.iterrows():
create_string = create_string_template.format(node_id=node['new_id'])
list_of_create_strings.append(create_string)
return list_of_create_strings
def add_relations(relations_list, language_code):
"""Returns a list of strings. Each string is a cypher query to add a relationship to neo4j."""
list_of_create_strings = []
create_string_template = """
MATCH (a),(b) WHERE a.node_id = {source} AND b.node_id = {target}
MERGE (a)-[r:KNOWS {{ relationship_id:{edge_id} }}]-(b)"""
for index, relations in relations_list.iterrows():
create_string = create_string_template.format(
source=relations['from'], target=relations['to'],
edge_id=''+str(relations['from'])+'-'+str(relations['to']))
list_of_create_strings.append(create_string)
return list_of_create_strings
def add_data(language_code, edges, features, targets, driver):
"""Add nodes and relationships to neo4j"""
add_nodes_cypher = add_nodes(targets, language_code) # Returns a list of strings. Each string is a cypher query to add a node to neo4j.
node_results = run_bulk_query(add_nodes_cypher, driver) # Runs each string in the above list in a neo4j session.
add_relations_cypher = add_relations(edges, language_code) # Returns a list of strings. Each string is a cypher query to add a relationship to neo4j.
relations_results = run_bulk_query(add_relations_cypher, driver) # Runs each string in the above list in a neo4j session.
# Saving some metadata
results = {
"nodes": {"results": node_results, "length":len(add_nodes_cypher),},
"relations": {"results": relations_results, "length":len(add_relations_cypher),},
}
return results
def load_data(language_code):
"""Load data from files"""
# Saving file names to variables
edges_filename = './edges.csv'
features_filename = './features.json'
target_filename = './target.csv'
# Loading data from the file names
edges = helper.read_csv(edges_filename)
features = helper.read_json(features_filename)
targets = helper.read_csv(target_filename)
# Saving some metadata
results = {
"edges": {"length":len(edges),},
"features": {"length":len(features),},
"targets": {"length":len(targets),},
}
return edges, features, targets, results
主要代码。
def process_language_files(process_language_files, driver):
"""Reads files, creates cypher queries to add nodes and relationships, runs cypher query in a neo4j session."""
edges, features, targets, reading_results = load_data(language_code) # Read files.
writing_results = add_data(language_code, edges, features, targets, driver) # Convert files nodes and relationships and add to neo4j in a neo4j session.
return {"reading_results": reading_results, "writing_results": writing_results} # Return some metadata
# Execution starts here
res=[]
for index, language_code in enumerate(['ENGLISH', 'FRENCH']):
lazy_result = dask.delayed(process_language_files)(language_code, global_driver)
res.append(lazy_result)
结果来自 res。这些是 dask 延迟对象。
print(*res)
Delayed('process_language_files-a73f4a9d-6ffa-4295-8803-7fe09849c068') Delayed('process_language_files-c88fbd4f-e8c1-40c0-b143-eda41a209862')
错误。即使使用 dask.compute(),我也会遇到类似的错误。
futures = dask.persist(*res)
AttributeError Traceback (most recent call last)
~/Code/miniconda3/envs/MVDS/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
48 buffers.clear()
---> 49 result = pickle.dumps(x, **dump_kwargs)
50 if len(result) < 1000:
AttributeError: Can't pickle local object 'BoltPool.open.<locals>.opener
============================================= =============================
# Name
Version
Build
Channel
dask
2020.12.0
pyhd8ed1ab_0
conda-forge
jupyterlab
3.0.3
pyhd8ed1ab_0
conda-forge
neo4j-python-driver
4.2.1
pyh7fcb38b_0
conda-forge
python
3.9.1
hdb3f193_2
您收到此错误是因为您正试图在您的工作人员之间共享驱动程序对象。
驱动程序对象包含有关连接的私有数据,这些数据在进程外没有意义(也不可序列化)。
这就像试图在某处打开一个文件并在其他地方共享文件描述符。
它不起作用,因为文件编号仅在生成它的进程中才有意义。
如果您希望您的员工访问数据库或任何其他网络资源,您应该向他们提供连接资源的说明。
在您的情况下,您不应将 global_driver
作为参数传递,而应将连接参数传递给每个工作人员调用 get_driver
以获取其自己的驱动程序。
我在并行化读取一些文件并写入 neo4j 的代码时遇到问题。
- 我正在使用 dask 并行化 process_language_files 函数(从底部数第 3 个单元格)。
- 我试着解释下面的代码,列出函数(前 3 个单元格)。
- 错误打印在最后(最后 2 个单元格)。
- 我还在最后列出了环境和包版本。
如果我依次删除 dask.delayed 和 运行 这段代码,它的效果非常好。
感谢您的帮助。 :)
============================================= =============================
与 neo4j 一起使用的一些函数。
from neo4j import GraphDatabase
from tqdm import tqdm
def get_driver(uri_scheme='bolt', host='localhost', port='7687', username='neo4j', password=''):
"""Get a neo4j driver."""
connection_uri = "{uri_scheme}://{host}:{port}".format(uri_scheme=uri_scheme, host=host, port=port)
auth = (username, password)
driver = GraphDatabase.driver(connection_uri, auth=auth)
return driver
def format_raw_res(raw_res):
"""Parse neo4j results"""
res = []
for r in raw_res:
res.append(r)
return res
def run_bulk_query(query_list, driver):
"""Run a list of neo4j queries in a session."""
results = []
with driver.session() as session:
for query in tqdm(query_list):
raw_res = session.run(query)
res = format_raw_res(raw_res)
results.append({'query':query, 'result':res})
return results
global_driver = get_driver(uri_scheme='bolt', host='localhost', port='8687', username='neo4j', password='abc123') # neo4j driver object.=
这就是我们创建 dask 客户端以进行并行化的方式。
from dask.distributed import Client
client = Client(threads_per_worker=4, n_workers=1)
主代码调用的函数。
import sys
import time
import json
import pandas as pd
import dask
def add_nodes(nodes_list, language_code):
"""Returns a list of strings. Each string is a cypher query to add a node to neo4j."""
list_of_create_strings = []
create_string_template = """CREATE (:LABEL {{node_id:{node_id}}})"""
for index, node in nodes_list.iterrows():
create_string = create_string_template.format(node_id=node['new_id'])
list_of_create_strings.append(create_string)
return list_of_create_strings
def add_relations(relations_list, language_code):
"""Returns a list of strings. Each string is a cypher query to add a relationship to neo4j."""
list_of_create_strings = []
create_string_template = """
MATCH (a),(b) WHERE a.node_id = {source} AND b.node_id = {target}
MERGE (a)-[r:KNOWS {{ relationship_id:{edge_id} }}]-(b)"""
for index, relations in relations_list.iterrows():
create_string = create_string_template.format(
source=relations['from'], target=relations['to'],
edge_id=''+str(relations['from'])+'-'+str(relations['to']))
list_of_create_strings.append(create_string)
return list_of_create_strings
def add_data(language_code, edges, features, targets, driver):
"""Add nodes and relationships to neo4j"""
add_nodes_cypher = add_nodes(targets, language_code) # Returns a list of strings. Each string is a cypher query to add a node to neo4j.
node_results = run_bulk_query(add_nodes_cypher, driver) # Runs each string in the above list in a neo4j session.
add_relations_cypher = add_relations(edges, language_code) # Returns a list of strings. Each string is a cypher query to add a relationship to neo4j.
relations_results = run_bulk_query(add_relations_cypher, driver) # Runs each string in the above list in a neo4j session.
# Saving some metadata
results = {
"nodes": {"results": node_results, "length":len(add_nodes_cypher),},
"relations": {"results": relations_results, "length":len(add_relations_cypher),},
}
return results
def load_data(language_code):
"""Load data from files"""
# Saving file names to variables
edges_filename = './edges.csv'
features_filename = './features.json'
target_filename = './target.csv'
# Loading data from the file names
edges = helper.read_csv(edges_filename)
features = helper.read_json(features_filename)
targets = helper.read_csv(target_filename)
# Saving some metadata
results = {
"edges": {"length":len(edges),},
"features": {"length":len(features),},
"targets": {"length":len(targets),},
}
return edges, features, targets, results
主要代码。
def process_language_files(process_language_files, driver):
"""Reads files, creates cypher queries to add nodes and relationships, runs cypher query in a neo4j session."""
edges, features, targets, reading_results = load_data(language_code) # Read files.
writing_results = add_data(language_code, edges, features, targets, driver) # Convert files nodes and relationships and add to neo4j in a neo4j session.
return {"reading_results": reading_results, "writing_results": writing_results} # Return some metadata
# Execution starts here
res=[]
for index, language_code in enumerate(['ENGLISH', 'FRENCH']):
lazy_result = dask.delayed(process_language_files)(language_code, global_driver)
res.append(lazy_result)
结果来自 res。这些是 dask 延迟对象。
print(*res)
Delayed('process_language_files-a73f4a9d-6ffa-4295-8803-7fe09849c068') Delayed('process_language_files-c88fbd4f-e8c1-40c0-b143-eda41a209862')
错误。即使使用 dask.compute(),我也会遇到类似的错误。
futures = dask.persist(*res)
AttributeError Traceback (most recent call last)
~/Code/miniconda3/envs/MVDS/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
48 buffers.clear()
---> 49 result = pickle.dumps(x, **dump_kwargs)
50 if len(result) < 1000:
AttributeError: Can't pickle local object 'BoltPool.open.<locals>.opener
============================================= =============================
# Name | Version | Build | Channel |
---|---|---|---|
dask | 2020.12.0 | pyhd8ed1ab_0 | conda-forge |
jupyterlab | 3.0.3 | pyhd8ed1ab_0 | conda-forge |
neo4j-python-driver | 4.2.1 | pyh7fcb38b_0 | conda-forge |
python | 3.9.1 | hdb3f193_2 |
您收到此错误是因为您正试图在您的工作人员之间共享驱动程序对象。
驱动程序对象包含有关连接的私有数据,这些数据在进程外没有意义(也不可序列化)。
这就像试图在某处打开一个文件并在其他地方共享文件描述符。 它不起作用,因为文件编号仅在生成它的进程中才有意义。
如果您希望您的员工访问数据库或任何其他网络资源,您应该向他们提供连接资源的说明。
在您的情况下,您不应将 global_driver
作为参数传递,而应将连接参数传递给每个工作人员调用 get_driver
以获取其自己的驱动程序。