锁定 dask.multiprocessing.get 并将元数据添加到 HDF

Locking in dask.multiprocessing.get and adding metadata to HDF

在纯 Python 中执行 ETL-task,我想收集每个考虑的原始输入文件的错误指标和元数据(错误指标是根据提供的错误代码计算的文件的数据部分,而元数据存储在 headers 中)。以下是整个过程的 pseudo-code:

import pandas as pd
import dask
from dask import delayed
from dask import dataframe as dd

META_DATA = {}  # shared resource
ERRORS = {}  # shared resource

def read_file(file_name):
    global META_DATA, ERRORS

    # step 1: process headers
    headers = read_header(file_name)
    errors = {}
    data_bfr = []

    # step 2: process data section
    for line in data_section:
        content_id, data = parse_line(line)
        if contains_errors(data):
            errors[content_id] = get_error_code(data)
        else:
            data_bfr.append(content_id, data)

    # ---- Part relevant for question 1 ----
    # step 3: acquire lock for shared resource and write metadata
    with lock.acquire():
        write_metadata(file_name, headers)  # stores metadata in META_DATA[file_name]
        write_errors(file_name, errors)  # stores error metrics in ERRORS[file_name]

    return pd.DataFrame(data=data_bfr,...)

with set_options(get=dask.multiprocessing.get):
    df = dd.from_delayed([delayed(read_file)(file_name) \
                          for file_name in os.listdir(wd)])

    # ---- Part relevant for question 2 ----
    df.to_hdf('data.hdf', '/data', 'w', complevel=9, \
        complib='blosc',..., metadata=(META_DATA, ERRORS))

对于每个输入文件read_filereturns一个pd.DataFrame,进一步将相关元数据和错误指标写入共享资源。我正在使用 dask 的多处理调度程序从延迟的 read_file 操作列表中计算 dask.dataframe

不要依赖全局变量

Dask 最适合 pure functions

特别是,您的情况在 Python 中是一个限制,因为它(正确地)不在进程之间共享全局数据。相反,我建议您 return 数据明确来自函数:

def read_file(file_name):
    ...
    return df, metadata, errors

values = [delayed(read_file)(fn) for fn in filenames]
dfs      = [v[0] for v in values]
metadata = [v[1] for v in values]
errors   = [v[2] for v in values]

df = dd.from_delayed(dfs)

import toolz
metadata = delayed(toolz.merge)(metadata)
errors = delayed(toolz.merge)(errors)