dask.compute() 中的重试不清楚
Retries in dask.compute() are unclear
根据文档,Number of allowed automatic retries if computing a result fails.
"result" 是指每个单独的任务还是整个 compute() 调用?
如果是指整个调用,dask.delayed中每个任务如何实现重试?
此外,根据以下代码,我不确定重试是否有效。
import dask
import random
@dask.delayed
def add(x, y):
return x + y
@dask.delayed
def divide(sum_i):
n = random.randint(0, 1)
result = sum_i / n
return result
tasks = []
for i in range(3):
sum_i = add(i, i+1)
divide_n = divide(sum_i)
tasks.append(divide_n)
dask.compute(*tasks, retries=1000)
预期输出为 (1, 3, 5),实际为 ZeroDivisionError。
如果有人感兴趣,我们可以为任务使用@retry 装饰器,如下所示:
@dask.delayed
@retry(Exception, tries=3, delay=5)
def my_func():
pass
重试装饰器:
from functools import wraps
def retry(exceptions, tries=4, delay=3, backoff=2, logger=None):
"""
Retry calling the decorated function using an exponential backoff.
Args:
exceptions: The exception to check. may be a tuple of
exceptions to check.
tries: Number of times to try (not retry) before giving up.
delay: Initial delay between retries in seconds.
backoff: Backoff multiplier (e.g. value of 2 will double the delay
each retry).
logger: Logger to use.
"""
if not logger:
logger = logging.getLogger(__name__)
def deco_retry(f):
@wraps(f)
def f_retry(*args, **kwargs):
mtries, mdelay = tries, delay
while mtries > 1:
try:
return f(*args, **kwargs)
except exceptions as e:
msg = f"{e}, \nRetrying in {mdelay} seconds..."
logger.warning(msg)
sleep(mdelay)
mtries -= 1
mdelay *= backoff
return f(*args, **kwargs)
return f_retry # true decorator
return deco_retry
根据文档,Number of allowed automatic retries if computing a result fails.
"result" 是指每个单独的任务还是整个 compute() 调用?
如果是指整个调用,dask.delayed中每个任务如何实现重试?
此外,根据以下代码,我不确定重试是否有效。
import dask
import random
@dask.delayed
def add(x, y):
return x + y
@dask.delayed
def divide(sum_i):
n = random.randint(0, 1)
result = sum_i / n
return result
tasks = []
for i in range(3):
sum_i = add(i, i+1)
divide_n = divide(sum_i)
tasks.append(divide_n)
dask.compute(*tasks, retries=1000)
预期输出为 (1, 3, 5),实际为 ZeroDivisionError。
如果有人感兴趣,我们可以为任务使用@retry 装饰器,如下所示:
@dask.delayed
@retry(Exception, tries=3, delay=5)
def my_func():
pass
重试装饰器:
from functools import wraps
def retry(exceptions, tries=4, delay=3, backoff=2, logger=None):
"""
Retry calling the decorated function using an exponential backoff.
Args:
exceptions: The exception to check. may be a tuple of
exceptions to check.
tries: Number of times to try (not retry) before giving up.
delay: Initial delay between retries in seconds.
backoff: Backoff multiplier (e.g. value of 2 will double the delay
each retry).
logger: Logger to use.
"""
if not logger:
logger = logging.getLogger(__name__)
def deco_retry(f):
@wraps(f)
def f_retry(*args, **kwargs):
mtries, mdelay = tries, delay
while mtries > 1:
try:
return f(*args, **kwargs)
except exceptions as e:
msg = f"{e}, \nRetrying in {mdelay} seconds..."
logger.warning(msg)
sleep(mdelay)
mtries -= 1
mdelay *= backoff
return f(*args, **kwargs)
return f_retry # true decorator
return deco_retry