使用 Dask 在大型集合上映射可变执行时间的函数
mapping a function of variable execution time over a large collection with Dask
我有大量条目 E 和函数 f: E --> pd.DataFrame
。对于不同的输入,函数 f 的执行时间可能会有很大差异。最后,所有 DataFrame 都应该连接成一个 DataFrame。
我想避免的情况是分区(为了示例使用 2 个分区),其中意外地所有快速函数执行都发生在分区 1 上,而所有慢速执行都发生在分区 2 上,因此没有最佳地使用工人。
partition 1:
[==][==][==]
partition 2:
[============][=============][===============]
--------------------time--------------------->
我目前的解决方案是迭代条目集合并使用 delayed
创建一个 Dask 图形,聚合延迟的部分 DataFrame 导致最终结果 DataFrame dd.from_delayed
.
delayed_dfs = []
for e in collection:
delayed_partial_df = delayed(f)(e, arg2, ...)
delayed_dfs.append(delayed_partial_df)
result_df = from_delayed(delayed_dfs, meta=make_meta({..}))
我推断 Dask 调度程序会负责将工作最佳分配给可用的工作人员。
- 这是一个正确的假设吗?
- 您认为整体做法合理吗?
正如上面评论所说,是的,你这样做是明智的。
任务最初会分配给工人,但如果一些工人比其他工人先完成分配的任务,那么他们会动态地从那些工作量过大的工人那里窃取任务。
另外如评论中所述,您可以考虑使用诊断仪表板来很好地了解调度程序正在做什么。所有关于工作负载、工作窃取等的信息都很容易查看。
我有大量条目 E 和函数 f: E --> pd.DataFrame
。对于不同的输入,函数 f 的执行时间可能会有很大差异。最后,所有 DataFrame 都应该连接成一个 DataFrame。
我想避免的情况是分区(为了示例使用 2 个分区),其中意外地所有快速函数执行都发生在分区 1 上,而所有慢速执行都发生在分区 2 上,因此没有最佳地使用工人。
partition 1:
[==][==][==]
partition 2:
[============][=============][===============]
--------------------time--------------------->
我目前的解决方案是迭代条目集合并使用 delayed
创建一个 Dask 图形,聚合延迟的部分 DataFrame 导致最终结果 DataFrame dd.from_delayed
.
delayed_dfs = []
for e in collection:
delayed_partial_df = delayed(f)(e, arg2, ...)
delayed_dfs.append(delayed_partial_df)
result_df = from_delayed(delayed_dfs, meta=make_meta({..}))
我推断 Dask 调度程序会负责将工作最佳分配给可用的工作人员。
- 这是一个正确的假设吗?
- 您认为整体做法合理吗?
正如上面评论所说,是的,你这样做是明智的。
任务最初会分配给工人,但如果一些工人比其他工人先完成分配的任务,那么他们会动态地从那些工作量过大的工人那里窃取任务。
另外如评论中所述,您可以考虑使用诊断仪表板来很好地了解调度程序正在做什么。所有关于工作负载、工作窃取等的信息都很容易查看。