Dask分布式顺序执行任务

Dask distributed executes tasks sequentially

我有一个使用 LocalCluster 的管道:

from distributed import Client
client = Client()

list_of_queries = [...]  # say 1_000 queries

loaded_data = client.map(sql_data_loader, list_of_queries)

processed_data = client.map(data_processor, loaded_data)

writer_results = client.map(data_writer, processed_data)

results = client.gather(writer_results)

一切正常,但与我预期的不太一样。

查看仪表板的状态页面,我看到类似这样的内容:

sql_data_loader             900 / 1000
data_processor                0 / 1000
data_writer                   0 / 1000

即任务按顺序执行,而不是 "in parallel"。因此 data_processor 直到加载完所有 1000 个查询后才开始执行。 data_writer 等到 'data_processor' 处理完所有的期货。

根据以前使用 dask 的经验,其中使用 dask.delayed 而不是 client.map 预期的行为类似于:

sql_data_loader              50 / 1000
data_processor               10 / 1000
data_writer                   5 / 1000

这是一个错误的期望还是我在如何设置管道以确保类似于 dask.delayed 的行为方面遗漏了什么?

如果您 运行 一个接一个地绘制地图,那么一切都应该很好地进行管道化。

两个预期目标之间存在一些矛盾:

  1. 任务应该流水线,如你所愿
  2. 先提交的任务应该有更高的优先级

为了在这两个目标之间取得平衡,Dask 根据调用之间的延迟分配策略。如果两个 map 调用紧接着发生,那么 Dask 会假定它们是同一计算的一部分,但是如果它们相隔很长时间,那么 Dask 会假定它们是不同的计算,因此优先处理较早的任务。您可以使用 fifo_timeout 关键字

来控制它
client.map(f, ..., fifo_timeout='10 minutes')

这里是相关的documentation page

这里是一个示例,显示了将地图调用捆绑在一起时您想要的行为:

In [1]: from dask.distributed import Client

In [2]: client = Client(processes=False)

In [3]: import time

In [4]: def f(x):
   ...:     time.sleep(0.1)
   ...:     print('f', x)
   ...:     return x
   ...: 

In [5]: def g(x):
   ...:     time.sleep(0.1)
   ...:     print('g', x)
   ...:     return x
   ...: 

In [6]: futures = client.map(f, range(20))
   ...: futures = client.map(g, futures)
   ...: 

In [7]: f 0
f 1
f 2
f 3
f 5
f 4
f 6
f 7
g 0
g 1
g 3
g 2
g 4
g 5
g 6
g 7
f 8
f 9
f 10
f 11
f 12
g 8
f 13
g 9
g 10
g 11
g 12
f 14
g 13
f 15
f 16
f 17
g 14
f 18
g 15
f 19
g 16
g 17
g 18
g 19