dask 处理延迟故障
dask handle delayed failures
如何将以下函数移植到 dask 以并行化它?
from time import sleep
from dask.distributed import Client
from dask import delayed
client = Client(n_workers=4)
from tqdm import tqdm
tqdm.pandas()
# linear
things = [1,2,3]
_x = []
_y = []
def my_slow_function(foo):
sleep(2)
x = foo
y = 2 * foo
assert y < 5
return x, y
for foo in tqdm(things):
try:
x_v, y_v = my_slow_function(foo)
_x.append(x_v)
if y_v is not None: _y.append(y_v)
except AssertionError:
print(f'failed: {foo}')
X = _x
y = _y
print(X)
print(y)
我特别不确定如何处理延迟期货中的状态和失败。
到目前为止我只有:
from dask.diagnostics import ProgressBar
ProgressBar().register()
@delayed(nout=2)
def my_slow_function(foo):
sleep(2)
x = foo
y = 2 * foo
assert y < 5
return x, y
for foo in tqdm(things):
try:
x_v, y_v = delayed(my_slow_function(foo))
_x.append(x_v)
if y_v is not None: _y.append(y_v)
except AssertionError:
print(f'failed: {foo}')
X = _x
y = _y
print(X)
print(y)
delayed(sum)(X).compute()
但是:
- try/except 不再有效。 IE。不再捕捉异常
- 我有 2 个延迟结果列表,但没有 2 个计算值列表
- 对于这两个列表,我不确定如何在不计算结果两次的情况下执行
compute
编辑
futures = client.map(my_slow_function, things)
results = client.gather(futures)
显然失败了,因为异常不再被处理——但到目前为止我还不确定从 dask 中捕获它们的正确方法是什么。
How to prevent dask client from dying on worker exception? 可能相似
dask的一个设计目标是在失败的情况下取消整个任务图()。相反,应该使用并发期货 API (https://docs.dask.org/en/latest/futures.html),它允许处理驱动程序上的故障:
futures = client.map(my_slow_function, things)
from dask.distributed import wait, as_completed
wait(futures)
for f in futures:
print(f)
try:
f = f.result()
print(f)
# to match 1:1 add unpacking and as well as append to the state result list
except:
# implement logging here
pass
如何将以下函数移植到 dask 以并行化它?
from time import sleep
from dask.distributed import Client
from dask import delayed
client = Client(n_workers=4)
from tqdm import tqdm
tqdm.pandas()
# linear
things = [1,2,3]
_x = []
_y = []
def my_slow_function(foo):
sleep(2)
x = foo
y = 2 * foo
assert y < 5
return x, y
for foo in tqdm(things):
try:
x_v, y_v = my_slow_function(foo)
_x.append(x_v)
if y_v is not None: _y.append(y_v)
except AssertionError:
print(f'failed: {foo}')
X = _x
y = _y
print(X)
print(y)
我特别不确定如何处理延迟期货中的状态和失败。
到目前为止我只有:
from dask.diagnostics import ProgressBar
ProgressBar().register()
@delayed(nout=2)
def my_slow_function(foo):
sleep(2)
x = foo
y = 2 * foo
assert y < 5
return x, y
for foo in tqdm(things):
try:
x_v, y_v = delayed(my_slow_function(foo))
_x.append(x_v)
if y_v is not None: _y.append(y_v)
except AssertionError:
print(f'failed: {foo}')
X = _x
y = _y
print(X)
print(y)
delayed(sum)(X).compute()
但是:
- try/except 不再有效。 IE。不再捕捉异常
- 我有 2 个延迟结果列表,但没有 2 个计算值列表
- 对于这两个列表,我不确定如何在不计算结果两次的情况下执行
compute
- 对于这两个列表,我不确定如何在不计算结果两次的情况下执行
编辑
futures = client.map(my_slow_function, things)
results = client.gather(futures)
显然失败了,因为异常不再被处理——但到目前为止我还不确定从 dask 中捕获它们的正确方法是什么。
How to prevent dask client from dying on worker exception? 可能相似
dask的一个设计目标是在失败的情况下取消整个任务图()。相反,应该使用并发期货 API (https://docs.dask.org/en/latest/futures.html),它允许处理驱动程序上的故障:
futures = client.map(my_slow_function, things)
from dask.distributed import wait, as_completed
wait(futures)
for f in futures:
print(f)
try:
f = f.result()
print(f)
# to match 1:1 add unpacking and as well as append to the state result list
except:
# implement logging here
pass