dask: return None 或从延迟任务中清空
dask: return None or empty from delayed task
我想从一组解析失败的延迟任务中 return 一个空数据帧/ None,例如;
import dask.dataframe as dd
import dask.delayed
def _read(self, filename):
try:
df = pd.read_csv(filename, sep=';', decimal=',', encoding='latin1', index_col=False)
return df
except BaseException as e:
return pd.DataFrame()
tasks = []
for root, dirs, files in os.walk(os.path.join(self._path, "files")):
for file in files:
tasks.append(dask.delayed(_read, pure=True)(os.path.join(root, file)))
ddf = dd.from_delayed(tasks)
其中一个或两个文件解析失败,目前我收到元数据不匹配。我可以 return 一个指定了 dask 数据帧元数据的数据帧,但只是想知道是否有更好的方法。
根据@mdurant 的评论,复制数据框维护类型并不像您期望的那么容易,但这似乎可行。当然,如果您的第一个文件出错,这将无法工作。
import dask.dataframe as dd
import dask.delayed
_default_record = None
def _read(self, filename):
try:
df = pd.read_csv(filename, sep=';', decimal=',', encoding='latin1', index_col=False)
if _default_record is None:
_default_record = pd.DataFrame.from_items([
(name, pd.Series(data=None, dtype=series.dtype))
for name, series in df.head(1).iteritems()])
return df
except BaseException as e:
return _default_record
tasks = []
for root, dirs, files in os.walk(os.path.join(self._path, "files")):
for file in files:
tasks.append(dask.delayed(_read, pure=True)(os.path.join(root, file)))
ddf = dd.from_delayed(tasks)
@morganics 的回答更新了我假设的更新版本 pandas(1.1.5) 和 dask (2020.12.0)。
import dask.dataframe as dd
import dask.delayed
import pandas as pd
_default_record = None
def _read(self, filename):
global _default_record
try:
df = pd.read_csv(filename, sep=';', decimal=',', encoding='latin1', index_col=False)
if _default_record is None:
_default_record = pd.DataFrame([
{name: pd.Series(data=None, dtype=series.dtype)
for name, series in df.head(1).iteritems()})
return df
except BaseException as e:
return _default_record
tasks = []
for root, dirs, files in os.walk(os.path.join(self._path, "files")):
for file in files:
tasks.append(dask.delayed(_read, pure=True)(os.path.join(root, file)))
ddf = dd.from_delayed(tasks)
我将 _default_record 设为全局变量,并从数据帧构造函数中删除了 from_items
方法,因为它在我的 pandas.[=12 版本中不存在=]
我想从一组解析失败的延迟任务中 return 一个空数据帧/ None,例如;
import dask.dataframe as dd
import dask.delayed
def _read(self, filename):
try:
df = pd.read_csv(filename, sep=';', decimal=',', encoding='latin1', index_col=False)
return df
except BaseException as e:
return pd.DataFrame()
tasks = []
for root, dirs, files in os.walk(os.path.join(self._path, "files")):
for file in files:
tasks.append(dask.delayed(_read, pure=True)(os.path.join(root, file)))
ddf = dd.from_delayed(tasks)
其中一个或两个文件解析失败,目前我收到元数据不匹配。我可以 return 一个指定了 dask 数据帧元数据的数据帧,但只是想知道是否有更好的方法。
根据@mdurant 的评论,复制数据框维护类型并不像您期望的那么容易,但这似乎可行。当然,如果您的第一个文件出错,这将无法工作。
import dask.dataframe as dd
import dask.delayed
_default_record = None
def _read(self, filename):
try:
df = pd.read_csv(filename, sep=';', decimal=',', encoding='latin1', index_col=False)
if _default_record is None:
_default_record = pd.DataFrame.from_items([
(name, pd.Series(data=None, dtype=series.dtype))
for name, series in df.head(1).iteritems()])
return df
except BaseException as e:
return _default_record
tasks = []
for root, dirs, files in os.walk(os.path.join(self._path, "files")):
for file in files:
tasks.append(dask.delayed(_read, pure=True)(os.path.join(root, file)))
ddf = dd.from_delayed(tasks)
@morganics 的回答更新了我假设的更新版本 pandas(1.1.5) 和 dask (2020.12.0)。
import dask.dataframe as dd
import dask.delayed
import pandas as pd
_default_record = None
def _read(self, filename):
global _default_record
try:
df = pd.read_csv(filename, sep=';', decimal=',', encoding='latin1', index_col=False)
if _default_record is None:
_default_record = pd.DataFrame([
{name: pd.Series(data=None, dtype=series.dtype)
for name, series in df.head(1).iteritems()})
return df
except BaseException as e:
return _default_record
tasks = []
for root, dirs, files in os.walk(os.path.join(self._path, "files")):
for file in files:
tasks.append(dask.delayed(_read, pure=True)(os.path.join(root, file)))
ddf = dd.from_delayed(tasks)
我将 _default_record 设为全局变量,并从数据帧构造函数中删除了 from_items
方法,因为它在我的 pandas.[=12 版本中不存在=]