dask 处理延迟故障

dask handle delayed failures

如何将以下函数移植到 dask 以并行化它?

from time import sleep
from dask.distributed import Client
from dask import delayed
client = Client(n_workers=4)
from tqdm import tqdm
tqdm.pandas()

# linear
things = [1,2,3]
_x = []
_y = []

def my_slow_function(foo):
    sleep(2)
    x = foo
    y = 2 * foo
    assert y < 5
    return x, y

for foo in tqdm(things):
    try:
        x_v, y_v = my_slow_function(foo)
        _x.append(x_v)
        if y_v is not None: _y.append(y_v)
    except AssertionError:
        print(f'failed: {foo}')

X = _x
y = _y

print(X)
print(y)

我特别不确定如何处理延迟期货中的状态和失败。

到目前为止我只有:

from dask.diagnostics import ProgressBar
ProgressBar().register()

@delayed(nout=2)
def my_slow_function(foo):
    sleep(2)
    x = foo
    y = 2 * foo
    assert y < 5
    return x, y


for foo in tqdm(things):
    try:
        x_v, y_v = delayed(my_slow_function(foo))
        _x.append(x_v)
        if y_v is not None: _y.append(y_v)
    except AssertionError:
        print(f'failed: {foo}')

X = _x
y = _y

print(X)
print(y)

delayed(sum)(X).compute()

但是:

编辑

futures = client.map(my_slow_function, things)
results = client.gather(futures)

显然失败了,因为异常不再被处理——但到目前为止我还不确定从 dask 中捕获它们的正确方法是什么。

How to prevent dask client from dying on worker exception? 可能相似

dask的一个设计目标是在失败的情况下取消整个任务图()。相反,应该使用并发期货 API (https://docs.dask.org/en/latest/futures.html),它允许处理驱动程序上的故障:

futures = client.map(my_slow_function, things)
from dask.distributed import wait, as_completed
wait(futures)

for f in futures:
    print(f)
    try:
        f = f.result()
        print(f)
        # to match 1:1 add unpacking and as well as append to the state result list
    except:
        # implement logging here
        pass