延迟函数的 dask 计算字典

dask compute dict of delayed functions

我想并行化这段代码:

-        "mean": float(zonal_extract.mean().compute()),
-        "min": float(zonal_extract.min().compute()),
-        "max": float(zonal_extract.max().compute()),
-        "sum": float(zonal_extract.sum().compute()),
-        "stddev": float(zonal_extract.std().compute()),
-        "var": float(zonal_extract.var().compute()),

这是我第一次尝试并行化 python 中的某些内容,这与一遍又一遍调用的函数不同。这将是相同的数据,不同的功能。

尝试 1

from dask import compute, delayed


results = delayed({})
results["mean"] = zonal_extract.mean
results["min"] = zonal_extract.min
results["max"] = zonal_extract.max
results["sum"] = zonal_extract.sum
results["stddev"] = zonal_extract.std
results["var"] = zonal_extract.var
results = compute(results, num_workers=4)  # , scheduler='processes'
results = {k: float(v) for k, v in results.items()}

尝试 2

mean, min, max, sum, stddev, var = compute(
    zonal_extract.mean(),
    zonal_extract.min(),
    zonal_extract.max(),
    zonal_extract.sum(),
    zonal_extract.std(),
    zonal_extract.var(),
    num_workers=4,
)  # , scheduler='processes'
results = {k: float(v) for k, v in dict(mean, min, max, sum, stddev, var).items()}

这似乎是一项简单的任务,但我找不到任何有用的东西。可能是因为我已经在多处理上下文和嵌套线程中(这可能不存在但听起来很酷)或者有错误:

    L = Parallel(n_jobs=-1)(
  File "/usr/local/lib/python3.9/dist-packages/joblib/parallel.py", line 1056, in __call__
    self.retrieve()
  File "/usr/local/lib/python3.9/dist-packages/joblib/parallel.py", line 935, in retrieve
    self._output.extend(job.get(timeout=self.timeout))
  File "/usr/local/lib/python3.9/dist-packages/joblib/_parallel_backends.py", line 542, in wrap_future_result
    return future.result(timeout=timeout)
  File "/usr/lib/python3.9/concurrent/futures/_base.py", line 445, in result
    return self.__get_result()
  File "/usr/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
    raise self._exception
TypeError: Delayed objects of unspecified length are not iterable

real    0m25.048s
user    0m46.943s

编辑:

ohhh 是因为延迟函数正在覆盖 joblib 的

from dask import compute, delayed
from joblib import Parallel, delayed

主要问题是我导入了两个函数名称相同的东西。改变这个

from dask import compute, delayed
from joblib import Parallel, delayed

至此

import dask
from joblib import Parallel, delayed

然后第二次尝试代码开始工作

    mean, min, max, sum, stddev, var = dask.compute(
        zonal_extract.mean(),
        zonal_extract.min(),
        zonal_extract.max(),
        zonal_extract.sum(),
        zonal_extract.std(),
        zonal_extract.var(),
        num_workers=3,
    )

    results = {
        k: float(v)
        for k, v in dict(
            mean=mean, min=min, max=max, sum=sum, stddev=stddev, var=var
        ).items()
    }

但是如果有人有办法实际使用 dicts with dask 而不是三遍命名,我会很乐意接受这个答案

dask.compute 将为您递归成字典。

你可以这样写:

results = dict(
    mean=dask.delayed(zonal_extract.mean)(),
    min=dask.delayed(zonal_extract.min)()
    # and more
)

results = dask.compute(results)[0]

基本思想是您可以将延迟计算嵌套到传递给 dask.compute 的元组、列表、字典等中。这里需要的只是从函数调用中创建成熟的延迟对象。

我们可以在不重复方面更“高效”:


computations = {k: dask.delayed(getattr(zonal_extract, k))()
                for k in "mean min max sum std var".split()}
results = dask.compute(computations)[0]

如果我退一步说,我猜这个并行化水平太低了——这些都是聚合,在算术运算中不是那么密集,它们都遍历相同的数据来做它。 var 只是 std 的平方,从这个意义上说,加速更简单。

@creanion 的回答很好,但我还要指出,不需要包装 mean()var()stddev() 等操作dask.delayed 对象中的 etc:这些已经是惰性操作,因此您可以直接对它们调用 dask.compute()

所以没有 delayed 包装器的最小示例是:

import dask
import dask.array as da

# Generate some fake data
zonal_extract = da.random.uniform(size=(100,), chunks=10)

summary_stats = {
    "mean": zonal_extract.mean(),
    "std": zonal_extract.std(),
    "var": zonal_extract.var(),
    "min": zonal_extract.min(),
    "max": zonal_extract.max(),
}

# traverse=True is default, but being explicit
summary_stats_computed, = dask.compute(summary_stats, traverse=True)

产生(我的随机数滚动):

{'mean': 0.4903848677019127,
 'std': 0.30733105780457826,
 'var': 0.09445237909128101,
 'min': 0.000996718178509548,
 'max': 0.9981326789252434}