如何将 dask.dataframe 与自定义 dsk 图表一起使用

how can one use dask.dataframe with custom dsk graphs

我会尝试重新表述我的问题:

如何将 dask.dataframe 与 zip 等函数结合使用?

假设我们有一个名为 "accounts.0.csv" 的文件,其中包含以下数据

id,names,amount
352,Dan,4837
387,Tim,208
42,Jerry,21
129,Patricia,284

我写了这段代码

import dask.dataframe as dd
import itertools
from dask.threaded import get


df = dd.read_csv('accounts.0.csv')

dsk = {'a': (dd.read_csv,('accounts.0.csv')),       
       'b': (itertools.repeat,(True)),       
       'res': (zip, 'a'[id],'b')       
       }

get(dsk, 'res')

此代码应生成如下内容:

352, True
387, True
42 , True
129, True

我该怎么做?

你需要"lift"(借用Haskell中Monads的术语)迭代器从计算内部出来,dask在开始任何计算之前构建任务列表,因此您需要从 "outside" 任何计算中获取迭代器。您对 compute 的呼叫让您 "outside" 感到震惊,这就是为什么它起作用了。

我不确定一个好的例子,因为你要做什么取决于接下来的其他任务,但作为一个不是很好但最小的例子:

import dask.imperative as di

arr = []
for col in df:
    arr.append(ddf[col].map(lambda x: (x,True)))
task = di.value([])+arr

创建一个任务列表,映射每个系列中的值。 then 使用命令式模块将所有内容包装在任务中——找不到更好的方法,抱歉!

然后您可以 compute 获取系列列表的任务,或将其用于其他用途。

改写问题

我会尝试将您的问题改写如下:

如何将 dask.dataframe 与自定义 dask 图表结合起来?

df = dd.read_csv('myfile.csv')
dsk = {'x': (add, 1, 2)}

dataframe 是高级集合,dask graph 是低级的。我们必须让其中一个达到其他人的水平。

使用 dask 命令

我们可以使用 dask.imperative 将自定义函数转换为高级 dsak 对象

# dsk = {'x': (inc, 1, 2)}
x = dask.do(add)(1, 2)

然后您可以对其中一个或两个对象使用 dask.compute

x_result = dask.compute(x)
or
df_result = dask.compute(df)
or
x_result, df_result = dask.compute(x, df)

到处使用低级别的 dask 图

可从 .dask._keys() 属性访问任何 DataFrame 对象的低级图和最终键。

from toolz import merge
graph = merge(dsk, df.dask)  # merge both graphs together
keys = ['x', df._keys()]     # final keys to compute

x_results, df_results = get(graph, keys)

df_result = df._finalize(df_results)  # turn graph outputs back to pandas dataframe

Zip 适用于 Python 迭代器,不适用于 Pandas 或 Dask DataFrames。

要实现上面的示例,您可以使用 assign 方法

pandas

In [1]: import pandas as pd

In [2]: df = pd.DataFrame({'x': [1, 2, 3]})

In [3]: df
Out[3]: 
   x
0  1
1  2
2  3

In [4]: df.assign(y=True)
Out[4]: 
   x     y
0  1  True
1  2  True
2  3  True

dask.dataframe

In [5]: import dask.dataframe as dd

In [6]: ddf = dd.from_pandas(df, npartitions=1)

In [7]: ddf.assign(y=True).compute()
Out[7]: 
   x     y
0  1  True
1  2  True
2  3  True

通常不要将图表与数据框混用

dsk = {...} 这样的字典样式的图表不应与dask.dataframe 对象混合。 dask.dataframe 对象在内部使用图表。它们不应该放在它们里面。