使用 Dask 在大型集合上映射可变执行时间的函数

mapping a function of variable execution time over a large collection with Dask

我有大量条目 E 和函数 f: E --> pd.DataFrame。对于不同的输入,函数 f 的执行时间可能会有很大差异。最后,所有 DataFrame 都应该连接成一个 DataFrame。

我想避免的情况是分区(为了示例使用 2 个分区),其中意外地所有快速函数执行都发生在分区 1 上,而所有慢速执行都发生在分区 2 上,因此没有最佳地使用工人。

partition 1:
[==][==][==]

partition 2:
[============][=============][===============]

--------------------time--------------------->

我目前的解决方案是迭代条目集合并使用 delayed 创建一个 Dask 图形,聚合延迟的部分 DataFrame 导致最终结果 DataFrame dd.from_delayed.

delayed_dfs = []  

for e in collection:
    delayed_partial_df = delayed(f)(e, arg2, ...)

    delayed_dfs.append(delayed_partial_df)

result_df = from_delayed(delayed_dfs, meta=make_meta({..}))

我推断 Dask 调度程序会负责将工作最佳分配给可用的工作人员。

  1. 这是一个正确的假设吗?
  2. 您认为整体做法合理吗?

正如上面评论所说,是的,你这样做是明智的。

任务最初会分配给工人,但如果一些工人比其他工人先完成分配的任务,那么他们会动态地从那些工作量过大的工人那里窃取任务。

另外如评论中所述,您可以考虑使用诊断仪表板来很好地了解调度程序正在做什么。所有关于工作负载、工作窃取等的信息都很容易查看。

http://distributed.readthedocs.io/en/latest/web.html