任务队列设计模式?

Dask job queue design pattern?

假设我有一个简单的昂贵函数,可以将一些结果存储到文件中:

def costly_function(filename):
    time.sleep(10)
    with open('filename', 'w') as f:
        f.write("I am done!)

现在假设我想在 dask 中安排一些这样的任务,然后异步接收这些请求并一个接一个地运行这些函数。我目前正在设置一个 dask 客户端对象...

cluster = dask.distributed.LocalCluster(n_workers=1, processes=False)  # my attempt at sequential job processing
client = dask.distributed.Client(cluster)

...然后交互地(来自 IPython)安排这些作业:

>>> client.schedule(costly_function, "result1.txt")
>>> client.schedule(costly_function, "result2.txt")
>>> client.schedule(costly_function, "result3.txt")

我遇到的问题是这些任务不是 运行 连续的而是并行的,在我的特定情况下这会导致并发问题。

所以我的问题是:像我上面在 dask 中描述的那样设置作业队列的正确方法是什么?

好吧,我想我可能有 a 解决方案(不过请随意想出更好的解决方案!)。需要稍微修改之前的代价函数:

def costly_function(filename, prev_job=None):
    time.sleep(10)
    with open('filename', 'w') as f:
        f.write("I am done!")

cluster = dask.distributed.LocalCluster(n_workers=1, processes=False)  # my attempt at sequential job processing
client = dask.distributed.Client(cluster)

然后在交互式上下文中,您将编写以下内容:

>>> future = client.submit(costly_function, "result1.txt")
>>> future = client.submit(costly_function, "result2.txt", prev_job=future)
>>> future = client.submit(costly_function, "result3.txt", prev_job=future)