使用分布式 Dask 调度程序重复执行任务
Repeated task execution using the distributed Dask scheduler
我正在使用 Dask 分布式调度程序,运行在本地设置一个调度程序和 5 个工作人员。我将 delayed()
任务列表提交给 compute()
。
当任务数量为 20(一个数字 >> 比工人的数量)并且每个任务至少需要 15 秒时,调度程序开始重新运行一些任务(或并行执行它们不止一次).
这是一个问题,因为任务修改了 SQL 数据库,如果它们再次 运行,它们最终会引发异常(由于数据库唯一性约束)。我没有在任何地方设置pure=True
(我相信默认设置是False
)。除此之外,Dask 图很简单(任务之间没有依赖关系)。
仍然不确定这是 Dask 中的功能还是错误。我有一种直觉,这可能与工人偷窃有关...
正确,如果一个任务分配给了一个工作人员,而另一个工作人员空闲了,它可能会选择从其他工作人员那里窃取多余的任务。有几率会窃取刚刚开始的任务运行,这样的话任务会运行两次
处理此问题的简洁方法是确保您的任务是幂等的,即使 运行 两次,它们 return 的结果也是相同的。这可能意味着在您的任务中处理您的数据库错误。
这是对数据密集型计算工作负载非常有用但对数据工程工作负载很糟糕的策略之一。设计一个同时满足这两种需求的系统是很棘手的。
我正在使用 Dask 分布式调度程序,运行在本地设置一个调度程序和 5 个工作人员。我将 delayed()
任务列表提交给 compute()
。
当任务数量为 20(一个数字 >> 比工人的数量)并且每个任务至少需要 15 秒时,调度程序开始重新运行一些任务(或并行执行它们不止一次).
这是一个问题,因为任务修改了 SQL 数据库,如果它们再次 运行,它们最终会引发异常(由于数据库唯一性约束)。我没有在任何地方设置pure=True
(我相信默认设置是False
)。除此之外,Dask 图很简单(任务之间没有依赖关系)。
仍然不确定这是 Dask 中的功能还是错误。我有一种直觉,这可能与工人偷窃有关...
正确,如果一个任务分配给了一个工作人员,而另一个工作人员空闲了,它可能会选择从其他工作人员那里窃取多余的任务。有几率会窃取刚刚开始的任务运行,这样的话任务会运行两次
处理此问题的简洁方法是确保您的任务是幂等的,即使 运行 两次,它们 return 的结果也是相同的。这可能意味着在您的任务中处理您的数据库错误。
这是对数据密集型计算工作负载非常有用但对数据工程工作负载很糟糕的策略之一。设计一个同时满足这两种需求的系统是很棘手的。