为什么有时我必须在 dask 延迟函数上调用 compute() 两次?

Why sometimes do I have to call compute() twice on dask delayed functions?

我正在使用 dask 延迟函数,并且我越来越熟悉在函数上使用 @dask.delayed 装饰器时的注意事项。我意识到有时我需要调用 compute() 两次才能得到结果,尽管我认为我遵循了最佳实践。即不要在另一个 dask 延迟函数中调用 dask 延迟函数。

我在两种情况下 运行 遇到过这个问题:有嵌套函数时,以及在 class 中调用使用延迟的 class 成员的成员函数时对象。

@dask.delayed
def add(a, b):
    return  a + b

def inc(a):
    return add(a, 1)

@dask.delayed
def foo(x):
    return inc(x)

x = foo(3)
x.compute()
class Add():
    def __init__(self, a, b):
        self.a = a
        self.b = b

    @dask.delayed
    def calc(self):
        return self.a+self.b

a = dask.delayed(1)
b = dask.delayed(2)
add = Add(a, b)
add.calc().compute()

在第一个示例中,x.compute() 不是 return 结果,而是另一个延迟对象,我必须调用 x.compute().compute() 才能获得实际结果。但我相信 inc 不是延迟函数,因此不违反不在另一个延迟函数中调用延迟函数的规则?

在第二个示例中,我将不得不再次调用 add.calc().compute().compute() 以获得实际结果。在这种情况下 self.aself.b 只是延迟属性,任何地方都没有嵌套的延迟函数。

任何人都可以帮助我理解为什么在这两种情况下我需要两次调用 compute() 吗?或者更好的是,有人可以简要解释一下使用 dask 延迟函数时的一般 'rule' 吗?我阅读了文档,在那里找不到太多内容。

更新: @malbert 指出这些示例需要调用 compute() 两次,因为延迟函数涉及延迟结果,因此它算作 'calling delayed function within another delayed function'。但是为什么像下面这样的东西只需要调用一次 compute() 呢?

@dask.delayed
def add(a,b):
    return a+b

a = dask.delayed(1)
b = dask.delayed(2)
c = add(a,b)
c.compute()

本例中,ab也是延迟结果,用在延迟函数中。我的随机猜测是真正重要的是延迟结果在延迟函数中的位置?可能只有将它们作为参数传入才好?

我认为关键在于更准确地理解 dask.delayed 的作用。

考虑

my_delayed_function = dask.delayed(my_function)

当在 my_function 上用作装饰器时,dask.delayed return 是一个函数 my_delayed_function,它会延迟 my_function 的执行。当使用参数

调用 my_delayed_function
delayed_result = my_delayed_function(arg)

这个 return 是一个对象,其中包含有关执行 my_function 的所有必要信息,参数为 arg

呼叫

result = delayed_result.compute()

触发函数的执行。

现在,在两个延迟结果上使用 + 等运算符的效果是,新的延迟结果是 returned,它将输入中包含的两个执行捆绑在一起。在此对象上调用 compute 会触发这一系列执行。


到目前为止一切顺利。现在,在您的第一个示例中,foo 调用 inc,后者调用延迟函数,return 是延迟结果。因此,计算 foo 正是这样做的,而 return 就是这个延迟的结果。对此延迟结果调用 compute(您的 "second" 计算)然后触发其计算。

在您的第二个示例中,ab 是延迟结果。添加两个延迟结果使用 + returns 捆绑执行 ab 及其添加的延迟结果。现在,由于 calc 是延迟函数,因此 return 是获得延迟结果的延迟结果。因此,它的计算将再次 return 延迟对象。

在这两种情况下,您都没有完全遵循 best practices。具体点

Avoid calling delayed within delayed functions

因为在您的第一个示例中延迟的 addinc 中被调用,它在 foo 中被调用。因此,您在延迟 foo 内调用延迟。在您的第二个示例中,延迟的 calc 正在处理延迟的 ab,因此您再次在延迟函数中调用延迟。

在你的问题中,你说

But I believe inc is not a delayed function and therefore it's not against the rule of not calling a delayed function within another delayed function?

我怀疑您可能对 "calling delayed within delayed functions" 的理解有误。这指的是函数内发生的所有事情,因此是函数的一部分:inc 包括延迟 add 的调用,因此延迟在 foo.

中被调用

问题更新后的补充:将延迟参数传递给延迟函数会将延迟执行捆绑到新的延迟结果中。这与 "calling delayed within the delayed function" 不同,是预期用例的一部分。实际上我也没有在文档中找到对此的明确解释,但一个切入点可能是 this: unpack_collections is used to process delayed arguments。即使这仍然有些不清楚,坚持最佳实践(以这种方式解释)应该会产生关于 compute().

输出的可重现行为。

以下代码在坚持 "Avoid calling delayed within delayed functions" 和 return 时会产生单次调用 compute 后的结果:

第一个例子:

#@dask.delayed
def add(a, b):
    return  a + b

def inc(a):
    return add(a, 1)

@dask.delayed
def foo(x):
    return inc(x)

x = foo(3)
x.compute()

第二个例子:

class Add():
    def __init__(self, a, b):
        self.a = a
        self.b = b

    #@dask.delayed
    def calc(self):
        return self.a+self.b

a = dask.delayed(1)
b = dask.delayed(2)
add = Add(a, b)
add.calc().compute()