确定 dask 计算某事的次数

Determine how many times dask computed something

问题

我想知道是否可以使用 dask(特别是 dask 数组)知道是否以及何时计算了某些内容。我正在考虑单元测试,想知道 dask 计算了一个数组多少次。类似于知道它们被调用了多少次的模拟对象。这样的东西已经存在了吗?如果没有,有没有比制作 custom callback 更好的方法?如果这不存在,dask 核心开发人员是否有兴趣将其添加到核心 dask 中进行测试?

非常感谢任何帮助。

详情

假设我有一个函数,它接受一个 xarray DataArray,对它做一些事情,然后 returns 它。在某些情况下,dask 数组被隐式转换为 numpy 数组,包括新的 dask 用户不知道最好的 dask 友好方式来做某事。我想编写我的单元测试以确保我或其他贡献者不会意外损害功能的性能。考虑到测试数据通常是真实世界案例的 simplified/small 版本,并且在这些情况下可能看不到多次计算 dask 数组的性能影响,这一点尤其重要。

编辑:解决方案

这是我根据 MRocklin 的回答最终做的简单解决方案。

class CustomScheduler(object):
    def __init__(self, max_computes=1):
        self.max_computes = max_computes
        self.total_computes = 0

    def __call__(self, dsk, keys, **kwargs):
        self.total_computes += 1
        if self.total_computes > self.max_computes:
            raise RuntimeError("Too many dask computations were scheduled: {}".format(self.total_computes))
        return dask.get(dsk, keys, **kwargs)

然后我这样使用它:

with dask.config.set(scheduler=CustomScheduler(0)):
    # dask array stuff

执行时触发的方式有很多种。

一种方法是指定自定义调度程序:

def my_scheduler(dsk, keys, **kwargs):
    print('computing!')
    return dask.get(dsk, keys, **kwargs)

with dask.config.set(scheduler=my_scheduler):
    ...

Custom callbacks,像你建议的那样也很容易实现。

如果您只使用 dask 数组,那么您可以查看 array plugins

测试套件中还使用了多种其他方法。