我实际上如何让 dask 计算延迟或基于 dask 容器的结果列表?

How do I actually get dask to compute a list of delayed or dask-container-based results?

我有一个简单的可并行化任务,即为拆分成多个文件的多个表独立计算结果。我可以构建延迟或 dask.dataframe 列表(并且也尝试过,例如字典),但我无法计算所有结果(我可以使用 [=12= 从 dask 图形样式字典中获取单个结果],但同样无法轻松计算出所有结果)。这是一个最小的例子:

>>> df = dd.from_pandas(pd.DataFrame({'a': [1,2]}), npartitions=1)
>>> numbers = [df['a'].mean() for _ in range(2)]
>>> dd.compute(numbers)
([<dask.dataframe.core.Scalar at 0x7f91d1523978>,
  <dask.dataframe.core.Scalar at 0x7f91d1523a58>],)

同样:

>>> from dask import delayed
>>> @delayed
... def mean(data):
...     sum(data) / len(data)
>>> delayed_numbers = [mean([1,2]) for _ in range(2)]
>>> dask.compute(delayed_numbers)
([Delayed('mean-0e0a0dea-fa92-470d-b06e-b639fbaacae3'),
  Delayed('mean-89f2e361-03b6-4279-bef7-572ceac76324')],)

我想得到 [3, 3],这是我基于 delayed collections docs 所期望的结果。

对于我真正的问题,我实际上想在 HDF5 文件中的表上进行计算,但考虑到我可以让它与 dask.get() 一起工作,我很确定我正在指定我的延迟 / dask 数据框已经正确了。

我会对直接生成字典的解决方案感兴趣,但我也可以 return dict() 的(键,值)元组列表,这可能不是巨大的性能损失。

Compute 将许多集合作为单独的参数。尝试按如下方式阐述你的论点:

In [1]: import dask.dataframe as dd

In [2]: import pandas as pd

In [3]: df = dd.from_pandas(pd.DataFrame({'a': [1,2]}), npartitions=1)

In [4]: numbers = [df['a'].mean() for _ in range(2)]

In [5]: dd.compute(*numbers)  # note the *
Out[5]: (1.5, 1.5)

或者,可能更常见:

In [6]: dd.compute(df.a.mean(), df.a.std())
Out[6]: (1.5, 0.707107)