在 dask 中收集未知长度的序列
Gathering a sequence of unknown length in dask
我想使用 dask 实现并行优化算法。我的目标是:
- 主优化循环应该 运行 在一个工作器上。
- 预先不知道优化步骤的数量,优化步骤必须产生其他任务。
- 中间结果应该是可审计的,这样我就可以监控正在发生的事情。
满足上述所有条件的示例代码是:
from time import sleep
from distributed import Client, get_client
def f(x):
sleep(0.5)
return (x - 1)**3
def derivative(x):
sleep(1)
return 3 * (x - 1)**2
def newton_optimization(x, fval, dfdx):
if abs(fval) < 1e-10:
return x, None
x = x - fval / dfdx
client = get_client()
fval = client.submit(f, x)
dfdx = client.submit(derivative, x)
next_step = client.submit(newton_optimization, x, fval, dfdx)
return x, next_step
client = Client()
task = client.submit(newton_optimization, 0, 1, 3)
while task is not None:
i, task = task.result()
print(i)
client.shutdown()
但是感觉并不优雅,例如,为了检查优化的当前状态,我需要从头开始一直跟踪结果链。有没有更好的方法?
一个快速的解决方案是使用 adaptive,这听起来很像您感兴趣的内容,并且具有非常酷的进度可视化效果。 (更新:糟糕,刚刚注意到有一个 adaptive 的贡献者与您同名,所以您很有可能知道这个包。)
对您的代码的一些建议:
- 现在计算是串行的:while 循环阻塞直到每个
task
被计算(由于 .result
)。更好的方法可能是使用 .map
提交多个值进行评估。
from dask.distributed import as_completed
# use context manager to avoid .shutdown
with Client() as client:
# submit multiple values with .map
# the sample values might need to be chosen with care for better efficiency
tasks = client.map(newton_optimization, [0, 0.1, 0.2], [1, 1.1, 1.2], [3, 3.1, 3.2])
# add to the task list as new tasks are spawned
for task in as_completed(tasks):
if task is not None:
tasks.append(task.result()[1]) # this could be refactored
- 多个起点可能会在某个点收敛,因此为了减少被评估的非常相似的值的数量,在生成新任务时合并一些舍入可能是可以接受的,例如如果您有多个任务正在评估相差
1e-5
的值,那么一旦达到可容忍的精度,添加 np.round(x, 1e-5)
可能会减少并行计算的数量。
也许您可以使用 Queue or some other coordination primitive 来跟踪中间体。
另外,在these Dask docs的基础上,这里使用worker_client()
上下文管理器会更加稳定。
所以,它会是这样的:
from distributed import worker_client, Queue
def newton_optimization(x, fval, dfdx):
if abs(fval) < 1e-10:
return x
with worker_client() as client:
x = x - fval / dfdx
fval = client.submit(f, x)
dfdx = client.submit(derivative, x)
next_step = client.submit(newton_optimization, x, fval, dfdx)
queue.put(next_step)
return x
queue = Queue()
while True:
future = queue.get()
print(future.result())
请注意,通常最好避免从任务中启动任务,因为这可能会导致可靠性问题。也就是说,看起来您的工作流程需要这个,所以我只想分享一下这个功能在 Dask 中得到了很好的支持(尽管文档说它是实验性的——文档需要更新)但确实希望这里有一些细微差别。 :)
感谢您提出这个问题,它有助于在这里展开良好的讨论:https://github.com/dask/distributed/issues/5671
我想使用 dask 实现并行优化算法。我的目标是:
- 主优化循环应该 运行 在一个工作器上。
- 预先不知道优化步骤的数量,优化步骤必须产生其他任务。
- 中间结果应该是可审计的,这样我就可以监控正在发生的事情。
满足上述所有条件的示例代码是:
from time import sleep
from distributed import Client, get_client
def f(x):
sleep(0.5)
return (x - 1)**3
def derivative(x):
sleep(1)
return 3 * (x - 1)**2
def newton_optimization(x, fval, dfdx):
if abs(fval) < 1e-10:
return x, None
x = x - fval / dfdx
client = get_client()
fval = client.submit(f, x)
dfdx = client.submit(derivative, x)
next_step = client.submit(newton_optimization, x, fval, dfdx)
return x, next_step
client = Client()
task = client.submit(newton_optimization, 0, 1, 3)
while task is not None:
i, task = task.result()
print(i)
client.shutdown()
但是感觉并不优雅,例如,为了检查优化的当前状态,我需要从头开始一直跟踪结果链。有没有更好的方法?
一个快速的解决方案是使用 adaptive,这听起来很像您感兴趣的内容,并且具有非常酷的进度可视化效果。 (更新:糟糕,刚刚注意到有一个 adaptive 的贡献者与您同名,所以您很有可能知道这个包。)
对您的代码的一些建议:
- 现在计算是串行的:while 循环阻塞直到每个
task
被计算(由于.result
)。更好的方法可能是使用.map
提交多个值进行评估。
from dask.distributed import as_completed
# use context manager to avoid .shutdown
with Client() as client:
# submit multiple values with .map
# the sample values might need to be chosen with care for better efficiency
tasks = client.map(newton_optimization, [0, 0.1, 0.2], [1, 1.1, 1.2], [3, 3.1, 3.2])
# add to the task list as new tasks are spawned
for task in as_completed(tasks):
if task is not None:
tasks.append(task.result()[1]) # this could be refactored
- 多个起点可能会在某个点收敛,因此为了减少被评估的非常相似的值的数量,在生成新任务时合并一些舍入可能是可以接受的,例如如果您有多个任务正在评估相差
1e-5
的值,那么一旦达到可容忍的精度,添加np.round(x, 1e-5)
可能会减少并行计算的数量。
也许您可以使用 Queue or some other coordination primitive 来跟踪中间体。
另外,在these Dask docs的基础上,这里使用worker_client()
上下文管理器会更加稳定。
所以,它会是这样的:
from distributed import worker_client, Queue
def newton_optimization(x, fval, dfdx):
if abs(fval) < 1e-10:
return x
with worker_client() as client:
x = x - fval / dfdx
fval = client.submit(f, x)
dfdx = client.submit(derivative, x)
next_step = client.submit(newton_optimization, x, fval, dfdx)
queue.put(next_step)
return x
queue = Queue()
while True:
future = queue.get()
print(future.result())
请注意,通常最好避免从任务中启动任务,因为这可能会导致可靠性问题。也就是说,看起来您的工作流程需要这个,所以我只想分享一下这个功能在 Dask 中得到了很好的支持(尽管文档说它是实验性的——文档需要更新)但确实希望这里有一些细微差别。 :)
感谢您提出这个问题,它有助于在这里展开良好的讨论:https://github.com/dask/distributed/issues/5671