Dask HighLevelGraph 短路计算

Dask HighLevelGraph short circuit computing

我正在尝试获取一个 DataFrame ddf 和 return 一个与 ddf 相同的新 DataFrame,除非 ddf 有一个空分区,它应该指向最近的非空组件。例如,如果 ddf 有分区 [P1, P2, P3, P4, P5, P6],其中 P2P3P6 是空的 Pandas DataFrame,那么它 returns 以下 Dask DataFrame:[P1, P1, P1, P4, P5, P5]。我的密码是

name = 'prev-nonempty-' + tokenize(ddf)
meta = ddf._meta
dsk = dict()
def helper(A, B):
  return B if A.empty else A
dsk[(name, 0)] = (helper, (ddf._name, 0), None)
for i in range(1, len(ddf.divisions)-1):
    dsk[(name, i)] = (helper, (ddf._name, i), (name, i-1))
graph = HighLevelGraph.from_collections(name, dsk, dependencies=[ddf])
return new_dd_object(graph, name, meta, ddf.divisions)

我的问题是在Dask HighLevelGraphs中是否有一种方法可以进行短路计算,以便如果发现非空分区,第i个分区的计算会提前停止。

它说 here

In cases like (add, 'x', 'y'), functions like add receive concrete values instead of keys. A Dask scheduler replaces keys (like x and y) with their computed values (like 1 and 2) before calling the add function.

这表明你不能将它短路,但也许我可以使用更复杂的 Dask 调度程序技巧?

不,标准任务图无法做到这一点。但是,您可以将此逻辑融入您的函数本身。

def func(accumulator, new_data):
    if is_done(accumulator):
        return accumulator 

所以您仍然 运行 完成所有任务,但在您满足条件后它们会非常快。

您也可以考虑使用 Dask Futures,但级别较低。 https://docs.dask.org/en/latest/futures.html