Dask HighLevelGraph 短路计算
Dask HighLevelGraph short circuit computing
我正在尝试获取一个 DataFrame ddf
和 return 一个与 ddf
相同的新 DataFrame,除非 ddf
有一个空分区,它应该指向最近的非空组件。例如,如果 ddf
有分区 [P1, P2, P3, P4, P5, P6]
,其中 P2
、P3
和 P6
是空的 Pandas DataFrame,那么它 returns 以下 Dask DataFrame:[P1, P1, P1, P4, P5, P5]
。我的密码是
name = 'prev-nonempty-' + tokenize(ddf)
meta = ddf._meta
dsk = dict()
def helper(A, B):
return B if A.empty else A
dsk[(name, 0)] = (helper, (ddf._name, 0), None)
for i in range(1, len(ddf.divisions)-1):
dsk[(name, i)] = (helper, (ddf._name, i), (name, i-1))
graph = HighLevelGraph.from_collections(name, dsk, dependencies=[ddf])
return new_dd_object(graph, name, meta, ddf.divisions)
我的问题是在Dask HighLevelGraphs中是否有一种方法可以进行短路计算,以便如果发现非空分区,第i个分区的计算会提前停止。
它说 here 那
In cases like (add, 'x', 'y')
, functions like add
receive concrete values instead of keys. A Dask scheduler replaces keys (like x
and y
) with their computed values (like 1
and 2
) before calling the add
function.
这表明你不能将它短路,但也许我可以使用更复杂的 Dask 调度程序技巧?
不,标准任务图无法做到这一点。但是,您可以将此逻辑融入您的函数本身。
def func(accumulator, new_data):
if is_done(accumulator):
return accumulator
所以您仍然 运行 完成所有任务,但在您满足条件后它们会非常快。
您也可以考虑使用 Dask Futures,但级别较低。 https://docs.dask.org/en/latest/futures.html
我正在尝试获取一个 DataFrame ddf
和 return 一个与 ddf
相同的新 DataFrame,除非 ddf
有一个空分区,它应该指向最近的非空组件。例如,如果 ddf
有分区 [P1, P2, P3, P4, P5, P6]
,其中 P2
、P3
和 P6
是空的 Pandas DataFrame,那么它 returns 以下 Dask DataFrame:[P1, P1, P1, P4, P5, P5]
。我的密码是
name = 'prev-nonempty-' + tokenize(ddf)
meta = ddf._meta
dsk = dict()
def helper(A, B):
return B if A.empty else A
dsk[(name, 0)] = (helper, (ddf._name, 0), None)
for i in range(1, len(ddf.divisions)-1):
dsk[(name, i)] = (helper, (ddf._name, i), (name, i-1))
graph = HighLevelGraph.from_collections(name, dsk, dependencies=[ddf])
return new_dd_object(graph, name, meta, ddf.divisions)
我的问题是在Dask HighLevelGraphs中是否有一种方法可以进行短路计算,以便如果发现非空分区,第i个分区的计算会提前停止。
它说 here 那
In cases like
(add, 'x', 'y')
, functions likeadd
receive concrete values instead of keys. A Dask scheduler replaces keys (likex
andy
) with their computed values (like1
and2
) before calling theadd
function.
这表明你不能将它短路,但也许我可以使用更复杂的 Dask 调度程序技巧?
不,标准任务图无法做到这一点。但是,您可以将此逻辑融入您的函数本身。
def func(accumulator, new_data):
if is_done(accumulator):
return accumulator
所以您仍然 运行 完成所有任务,但在您满足条件后它们会非常快。
您也可以考虑使用 Dask Futures,但级别较低。 https://docs.dask.org/en/latest/futures.html