在 dask 中收集未知长度的序列

Gathering a sequence of unknown length in dask

我想使用 dask 实现并行优化算法。我的目标是:

满足上述所有条件的示例代码是:

from time import sleep
from distributed import Client, get_client


def f(x):
    sleep(0.5)
    return (x - 1)**3


def derivative(x):
    sleep(1)
    return 3 * (x - 1)**2


def newton_optimization(x, fval, dfdx):
    if abs(fval) < 1e-10:
        return x, None
    x = x - fval / dfdx
    client = get_client()
    fval = client.submit(f, x)
    dfdx = client.submit(derivative, x)
    next_step = client.submit(newton_optimization, x, fval, dfdx)
    return x, next_step


client = Client()
task = client.submit(newton_optimization, 0, 1, 3)

while task is not None:
    i, task = task.result()
    print(i)

client.shutdown()

但是感觉并不优雅,例如,为了检查优化的当前状态,我需要从头开始一直跟踪结果链。有没有更好的方法?

一个快速的解决方案是使用 adaptive,这听起来很像您感兴趣的内容,并且具有非常酷的进度可视化效果。 (更新:糟糕,刚刚注意到有一个 adaptive 的贡献者与您同名,所以您很有可能知道这个包。)

对您的代码的一些建议:

  • 现在计算是串行的:while 循环阻塞直到每个 task 被计算(由于 .result)。更好的方法可能是使用 .map 提交多个值进行评估。
from dask.distributed import as_completed

# use context manager to avoid .shutdown
with Client() as client:

    # submit multiple values with .map
    # the sample values might need to be chosen with care for better efficiency
    tasks = client.map(newton_optimization, [0, 0.1, 0.2], [1, 1.1, 1.2], [3, 3.1, 3.2]) 

    # add to the task list as new tasks are spawned
    for task in as_completed(tasks):
        if task is not None:
            tasks.append(task.result()[1]) # this could be refactored
  • 多个起点可能会在某个点收敛,因此为了减少被评估的非常相似的值的数量,在生成新任务时合并一些舍入可能是可以接受的,例如如果您有多个任务正在评估相差 1e-5 的值,那么一旦达到可容忍的精度,添加 np.round(x, 1e-5) 可能会减少并行计算的数量。

也许您可以使用 Queue or some other coordination primitive 来跟踪中间体。

另外,在these Dask docs的基础上,这里使用worker_client()上下文管理器会更加稳定。

所以,它会是这样的:

from distributed import worker_client, Queue

def newton_optimization(x, fval, dfdx):
    if abs(fval) < 1e-10:
        return x
    with worker_client() as client:
        x = x - fval / dfdx
        fval = client.submit(f, x)
        dfdx = client.submit(derivative, x)
        next_step = client.submit(newton_optimization, x, fval, dfdx)
        queue.put(next_step)
    return x

queue = Queue()

while True:
    future = queue.get()
    print(future.result())

请注意,通常最好避免从任务中启动任务,因为这可能会导致可靠性问题。也就是说,看起来您的工作流程需要这个,所以我只想分享一下这个功能在 Dask 中得到了很好的支持(尽管文档说它是实验性的——文档需要更新)但确实希望这里有一些细微差别。 :)

感谢您提出这个问题,它有助于在这里展开良好的讨论:https://github.com/dask/distributed/issues/5671