带有 numpy 数组的 dask bag foldby

dask bag foldby with numpy arrays

当我在包含 numpy 数组的 dask.bag 上执行 foldby 时,我从 dask / numpy 收到一条非常无用的 FutureWarning 消息。

def binop(a, b):
    print('binop')
    return a + b[1]

def combine(a, b):
    print('combine')
    return a + b[1]

seq = ((np.random.randint(0, 5, size=1)[0], np.ones(5,)) for _ in range(50))
db.from_sequence(seq, partition_size=10)\
    .foldby(0, binop=binop, initial=np.zeros(5,), combine=combine)\
    .compute()

目的只是把一堆NumPy数组加起来。这会产生正确的结果,但也会从 NumPy 产生许多 FutureWarning 消息(看起来每个分区一个),尽管它们看起来好像来自 dask.

dask/async.py:247: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison return func(*args2)

只是将两个没有 dasknumpy 数组相加不会产生结果,所以这里显然与并行 .foldby 有一些关系。看起来警告是在任何计算完成之前产生的。

我正在使用 python 3.6 dask 0.14.1 和 numpy 1.12.1

dask.bag.foldby


更新

感谢@MRocklin 的回答,我开始对此进行更多研究。所以 dask.async.py 中的违规代码是 this

def _execute_task(arg, cache, dsk=None):
....
    if isinstance(arg, list):
        return [_execute_task(a, cache) for a in arg]
    elif istask(arg):
        func, args = arg[0], arg[1:]
        args2 = [_execute_task(a, cache) for a in args]
        return func(*args2)

是否有可能 dask 实际上正在尝试遍历 args2 = [_execute_task(a, cache) for a in args] 中的 numpy 数组,我不太了解内部(事实上根本不了解)判断那些变量包含什么。

这个警告确实来自numpy。快速搜索代码库会得到 these lines:

    if (!strcmp(ufunc_name, "equal") ||
            !strcmp(ufunc_name, "not_equal")) {
        /* Warn on non-scalar, return NotImplemented regardless */
        assert(nin == 2);
        if (PyArray_NDIM(out_op[0]) != 0 ||
                PyArray_NDIM(out_op[1]) != 0) {
            if (DEPRECATE_FUTUREWARNING(
                    "elementwise comparison failed; returning scalar "
                    "instead, but in the future will perform elementwise "
                    "comparison") < 0) {
                return -1;
            }
        }

Dask 可能会使情况变得更糟,因为您会在每个进程中收到一次警告(dask.bag 默认情况下使用进程池)。

此外,如果您的计算受 numpy 约束,那么您可能会考虑切换到线程调度程序而不是多进程调度程序

mybag.compute(get=dask.threaded.get)

http://dask.pydata.org/en/latest/scheduler-choice.html

这与cytoolz.itetoolz.reducebyinit值有关。将 init 从 init=np.zeros((5,)) 更改为 init=lambda: np.zeros((5,)) 至少可以消除警告消息。

警告由 this line

产生
cpdef dict reduceby(object key, object binop, object seq, object init='__no__default__'):
...
    cdef bint skip_init = init == no_default

将传入的初始值 (np.zeros((5,))) 与字符串 "__no__default__" 进行比较,导致 numpy 无法通过 carray 和 [=19= 的元素明智比较].

所以回答我自己的问题:

  • 不,您不必担心那个警告,但它可能会在将来减慢程序速度
  • 通过使用可调用对象作为 init
  • 来完全避免警告
  • 这似乎不会产生任何重大的负面影响,但请记住,init 可调用函数将在每个执行程序进程中调用一次