dask: return None 或从延迟任务中清空

dask: return None or empty from delayed task

我想从一组解析失败的延迟任务中 return 一个空数据帧/ None,例如;

import dask.dataframe as dd
import dask.delayed

def _read(self, filename):
    try:
        df = pd.read_csv(filename, sep=';', decimal=',', encoding='latin1', index_col=False)
        return df
    except BaseException as e:
        return pd.DataFrame()


tasks = []
for root, dirs, files in os.walk(os.path.join(self._path, "files")):
    for file in files:
        tasks.append(dask.delayed(_read, pure=True)(os.path.join(root, file)))

ddf = dd.from_delayed(tasks)

其中一个或两个文件解析失败,目前我收到元数据不匹配。我可以 return 一个指定了 dask 数据帧元数据的数据帧,但只是想知道是否有更好的方法。

根据@mdurant 的评论,复制数据框维护类型并不像您期望的那么容易,但这似乎可行。当然,如果您的第一个文件出错,这将无法工作。

import dask.dataframe as dd
import dask.delayed

_default_record = None

def _read(self, filename):
    try:
        df = pd.read_csv(filename, sep=';', decimal=',', encoding='latin1', index_col=False)
        if _default_record is None:
            _default_record = pd.DataFrame.from_items([
                        (name, pd.Series(data=None, dtype=series.dtype))
                              for name, series in df.head(1).iteritems()])
        return df
    except BaseException as e:
        return _default_record


tasks = []
for root, dirs, files in os.walk(os.path.join(self._path, "files")):
    for file in files:
        tasks.append(dask.delayed(_read, pure=True)(os.path.join(root, file)))

ddf = dd.from_delayed(tasks)

@morganics 的回答更新了我假设的更新版本 pandas(1.1.5) 和 dask (2020.12.0)。

import dask.dataframe as dd
import dask.delayed
import pandas as pd

_default_record = None

def _read(self, filename):
    global _default_record
    try:
        df = pd.read_csv(filename, sep=';', decimal=',', encoding='latin1', index_col=False)
        if _default_record is None:
            _default_record = pd.DataFrame([
                        {name: pd.Series(data=None, dtype=series.dtype)
                              for name, series in df.head(1).iteritems()})
        return df
    except BaseException as e:
        return _default_record


tasks = []
for root, dirs, files in os.walk(os.path.join(self._path, "files")):
    for file in files:
        tasks.append(dask.delayed(_read, pure=True)(os.path.join(root, file)))

ddf = dd.from_delayed(tasks)

我将 _default_record 设为全局变量,并从数据帧构造函数中删除了 from_items 方法,因为它在我的 pandas.[=12 版本中不存在=]