在 PYTHON 中使用 DASK 读取文件并写入 NEO4J

Using DASK to read files and write to NEO4J in PYTHON

我在并行化读取一些文件并写入 neo4j 的代码时遇到问题。

如果我依次删除 dask.delayed 和 运行 这段代码,它的效果非常好。

感谢您的帮助。 :)

============================================= =============================

与 neo4j 一起使用的一些函数。

from neo4j import GraphDatabase
from tqdm import tqdm

def get_driver(uri_scheme='bolt', host='localhost', port='7687', username='neo4j', password=''):
"""Get a neo4j driver."""
    connection_uri = "{uri_scheme}://{host}:{port}".format(uri_scheme=uri_scheme, host=host, port=port)
    auth = (username, password)
    driver = GraphDatabase.driver(connection_uri, auth=auth)
    return driver

def format_raw_res(raw_res):
"""Parse neo4j results"""
    res = []
    for r in raw_res:
        res.append(r)
    return res

def run_bulk_query(query_list, driver):
"""Run a list of neo4j queries in a session."""
    results = []
    with driver.session() as session:
        for query in tqdm(query_list):
            raw_res = session.run(query)
            res = format_raw_res(raw_res)
            results.append({'query':query, 'result':res})
    return results

global_driver = get_driver(uri_scheme='bolt', host='localhost', port='8687', username='neo4j', password='abc123')  # neo4j driver object.=

这就是我们创建 dask 客户端以进行并行化的方式。

from dask.distributed import Client
client = Client(threads_per_worker=4, n_workers=1)

主代码调用的函数。

import sys
import time
import json

import pandas as pd

import dask

def add_nodes(nodes_list, language_code):
"""Returns a list of strings. Each string is a cypher query to add a node to neo4j."""
    list_of_create_strings = []
    create_string_template = """CREATE (:LABEL {{node_id:{node_id}}})"""

    for index, node in nodes_list.iterrows():
        create_string = create_string_template.format(node_id=node['new_id'])
        list_of_create_strings.append(create_string)

    return list_of_create_strings
        
def add_relations(relations_list, language_code):
"""Returns a list of strings. Each string is a cypher query to add a relationship to neo4j."""
    list_of_create_strings = []
    create_string_template = """
        MATCH (a),(b) WHERE a.node_id = {source} AND b.node_id = {target} 
        MERGE (a)-[r:KNOWS {{ relationship_id:{edge_id} }}]-(b)"""

    for index, relations in relations_list.iterrows():
        create_string = create_string_template.format(
            source=relations['from'], target=relations['to'],
            edge_id=''+str(relations['from'])+'-'+str(relations['to']))
        list_of_create_strings.append(create_string)

    return list_of_create_strings


def add_data(language_code, edges, features, targets, driver):
"""Add nodes and relationships to neo4j"""
    add_nodes_cypher = add_nodes(targets, language_code)  # Returns a list of strings. Each string is a cypher query to add a node to neo4j.
    node_results = run_bulk_query(add_nodes_cypher, driver)  # Runs each string in the above list in a neo4j session.


    add_relations_cypher = add_relations(edges, language_code)  # Returns a list of strings. Each string is a cypher query to add a relationship to neo4j.
    relations_results = run_bulk_query(add_relations_cypher, driver)  # Runs each string in the above list in a neo4j session.    
    
    # Saving some metadata
    results = {
        "nodes": {"results": node_results, "length":len(add_nodes_cypher),}, 
        "relations": {"results": relations_results, "length":len(add_relations_cypher),}, 
    }
    return results


def load_data(language_code):
"""Load data from files"""
    # Saving file names to variables
    edges_filename = './edges.csv'
    features_filename = './features.json'
    target_filename = './target.csv'
    
    # Loading data from the file names
    edges = helper.read_csv(edges_filename)
    features = helper.read_json(features_filename)
    targets = helper.read_csv(target_filename)
    
    # Saving some metadata
    results = {
        "edges": {"length":len(edges),}, 
        "features": {"length":len(features),}, 
        "targets": {"length":len(targets),},
    }
    return edges, features, targets, results

主要代码。

def process_language_files(process_language_files, driver):
"""Reads files, creates cypher queries to add nodes and relationships, runs cypher query in a neo4j session."""
    edges, features, targets, reading_results = load_data(language_code)  # Read files.

    writing_results = add_data(language_code, edges, features, targets, driver)  # Convert files nodes and relationships and add to neo4j in a neo4j session.
    
    return {"reading_results": reading_results, "writing_results": writing_results}  # Return some metadata


# Execution starts here
res=[]
for index, language_code in enumerate(['ENGLISH', 'FRENCH']):
    
    lazy_result = dask.delayed(process_language_files)(language_code, global_driver)
    res.append(lazy_result)

结果来自 res。这些是 dask 延迟对象。

print(*res)
Delayed('process_language_files-a73f4a9d-6ffa-4295-8803-7fe09849c068') Delayed('process_language_files-c88fbd4f-e8c1-40c0-b143-eda41a209862')

错误。即使使用 dask.compute(),我也会遇到类似的错误。

futures = dask.persist(*res)
AttributeError                            Traceback (most recent call last)
~/Code/miniconda3/envs/MVDS/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
     48         buffers.clear()
---> 49         result = pickle.dumps(x, **dump_kwargs)
     50         if len(result) < 1000:

AttributeError: Can't pickle local object 'BoltPool.open.<locals>.opener

============================================= =============================

# Name Version Build Channel
dask 2020.12.0 pyhd8ed1ab_0 conda-forge
jupyterlab 3.0.3 pyhd8ed1ab_0 conda-forge
neo4j-python-driver 4.2.1 pyh7fcb38b_0 conda-forge
python 3.9.1 hdb3f193_2

您收到此错误是因为您正试图在您的工作人员之间共享驱动程序对象。

驱动程序对象包含有关连接的私有数据,这些数据在进程外没有意义(也不可序列化)。

这就像试图在某处打开一个文件并在其他地方共享文件描述符。 它不起作用,因为文件编号仅在生成它的进程中才有意义。

如果您希望您的员工访问数据库或任何其他网络资源,您应该向他们提供连接资源的说明。

在您的情况下,您不应将 global_driver 作为参数传递,而应将连接参数传递给每个工作人员调用 get_driver 以获取其自己的驱动程序。