Dask分布式顺序执行任务
Dask distributed executes tasks sequentially
我有一个使用 LocalCluster
的管道:
from distributed import Client
client = Client()
list_of_queries = [...] # say 1_000 queries
loaded_data = client.map(sql_data_loader, list_of_queries)
processed_data = client.map(data_processor, loaded_data)
writer_results = client.map(data_writer, processed_data)
results = client.gather(writer_results)
一切正常,但与我预期的不太一样。
查看仪表板的状态页面,我看到类似这样的内容:
sql_data_loader 900 / 1000
data_processor 0 / 1000
data_writer 0 / 1000
即任务按顺序执行,而不是 "in parallel"。因此 data_processor
直到加载完所有 1000 个查询后才开始执行。 data_writer
等到 'data_processor' 处理完所有的期货。
根据以前使用 dask 的经验,其中使用 dask.delayed
而不是 client.map
预期的行为类似于:
sql_data_loader 50 / 1000
data_processor 10 / 1000
data_writer 5 / 1000
这是一个错误的期望还是我在如何设置管道以确保类似于 dask.delayed
的行为方面遗漏了什么?
如果您 运行 一个接一个地绘制地图,那么一切都应该很好地进行管道化。
两个预期目标之间存在一些矛盾:
- 任务应该流水线,如你所愿
- 先提交的任务应该有更高的优先级
为了在这两个目标之间取得平衡,Dask 根据调用之间的延迟分配策略。如果两个 map 调用紧接着发生,那么 Dask 会假定它们是同一计算的一部分,但是如果它们相隔很长时间,那么 Dask 会假定它们是不同的计算,因此优先处理较早的任务。您可以使用 fifo_timeout
关键字
来控制它
client.map(f, ..., fifo_timeout='10 minutes')
这里是相关的documentation page
这里是一个示例,显示了将地图调用捆绑在一起时您想要的行为:
In [1]: from dask.distributed import Client
In [2]: client = Client(processes=False)
In [3]: import time
In [4]: def f(x):
...: time.sleep(0.1)
...: print('f', x)
...: return x
...:
In [5]: def g(x):
...: time.sleep(0.1)
...: print('g', x)
...: return x
...:
In [6]: futures = client.map(f, range(20))
...: futures = client.map(g, futures)
...:
In [7]: f 0
f 1
f 2
f 3
f 5
f 4
f 6
f 7
g 0
g 1
g 3
g 2
g 4
g 5
g 6
g 7
f 8
f 9
f 10
f 11
f 12
g 8
f 13
g 9
g 10
g 11
g 12
f 14
g 13
f 15
f 16
f 17
g 14
f 18
g 15
f 19
g 16
g 17
g 18
g 19
我有一个使用 LocalCluster
的管道:
from distributed import Client
client = Client()
list_of_queries = [...] # say 1_000 queries
loaded_data = client.map(sql_data_loader, list_of_queries)
processed_data = client.map(data_processor, loaded_data)
writer_results = client.map(data_writer, processed_data)
results = client.gather(writer_results)
一切正常,但与我预期的不太一样。
查看仪表板的状态页面,我看到类似这样的内容:
sql_data_loader 900 / 1000
data_processor 0 / 1000
data_writer 0 / 1000
即任务按顺序执行,而不是 "in parallel"。因此 data_processor
直到加载完所有 1000 个查询后才开始执行。 data_writer
等到 'data_processor' 处理完所有的期货。
根据以前使用 dask 的经验,其中使用 dask.delayed
而不是 client.map
预期的行为类似于:
sql_data_loader 50 / 1000
data_processor 10 / 1000
data_writer 5 / 1000
这是一个错误的期望还是我在如何设置管道以确保类似于 dask.delayed
的行为方面遗漏了什么?
如果您 运行 一个接一个地绘制地图,那么一切都应该很好地进行管道化。
两个预期目标之间存在一些矛盾:
- 任务应该流水线,如你所愿
- 先提交的任务应该有更高的优先级
为了在这两个目标之间取得平衡,Dask 根据调用之间的延迟分配策略。如果两个 map 调用紧接着发生,那么 Dask 会假定它们是同一计算的一部分,但是如果它们相隔很长时间,那么 Dask 会假定它们是不同的计算,因此优先处理较早的任务。您可以使用 fifo_timeout
关键字
client.map(f, ..., fifo_timeout='10 minutes')
这里是相关的documentation page
这里是一个示例,显示了将地图调用捆绑在一起时您想要的行为:
In [1]: from dask.distributed import Client
In [2]: client = Client(processes=False)
In [3]: import time
In [4]: def f(x):
...: time.sleep(0.1)
...: print('f', x)
...: return x
...:
In [5]: def g(x):
...: time.sleep(0.1)
...: print('g', x)
...: return x
...:
In [6]: futures = client.map(f, range(20))
...: futures = client.map(g, futures)
...:
In [7]: f 0
f 1
f 2
f 3
f 5
f 4
f 6
f 7
g 0
g 1
g 3
g 2
g 4
g 5
g 6
g 7
f 8
f 9
f 10
f 11
f 12
g 8
f 13
g 9
g 10
g 11
g 12
f 14
g 13
f 15
f 16
f 17
g 14
f 18
g 15
f 19
g 16
g 17
g 18
g 19