如何使用 dask 执行多线程`merge()`?如何通过 qsub 使用多核?

How to execute a multi-threaded `merge()` with dask? How to use multiples cores via qsub?

我刚开始使用 dask,但我仍然对如何使用多线程或使用集群执行简单的 pandas 任务从根本上感到困惑。

让我们使用 pandas.merge()dask 数据帧。

import dask.dataframe as dd

df1 = dd.read_csv("file1.csv")
df2 = dd.read_csv("file2.csv")

df3 = dd.merge(df1, df2)

现在,假设我要在我的 4 核笔记本电脑上 运行 这个。如何为这个任务分配 4 个线程?

看起来正确的方法是:

dask.set_options(get=dask.threaded.get)
df3 = dd.merge(df1, df2).compute()

这将使用尽可能多的线程(即,在您的笔记本电脑上存在尽可能多的具有共享内存的内核,4)?如何设置线程数?

假设我在一个有 100 个核心的设施中。我如何以与使用 qsub 向集群提交作业相同的方式提交它? (类似于 运行通过 MPI 在集群上执行任务?)

dask.set_options(get=dask.threaded.get)
df3 = dd.merge(df1, df2).compute

单机调度

Dask.dataframe 默认情况下将使用线程调度程序,线程数与计算机中的逻辑内核数一样多。

正如评论中所指出的,您可以使用 .compute() 方法的关键字参数来控制线程数或 Pool 实现。

分布式机器调度

您可以使用 dask.distributed to deploy dask workers across many nodes in a cluster。使用 qsub 执行此操作的一种方法是在本地启动 dask-scheduler

$ dask-scheduler
Scheduler started at 192.168.1.100:8786

然后使用qsub启动多个dask-worker进程,指向报告的地址:

$ qsub dask-worker 192.168.1.100:8786 ... <various options>

截至昨天,有一个实验包可以在任何启用 DRMAA 的系统(包括 SGE/qsub-like 系统)上执行此操作:https://github.com/dask/dask-drmaa

完成此操作后,您可以创建一个 dask.distributed.Client 对象,它将接管默认调度程序

from dask.distributed import Client
c = Client('192.168.1.100:8786')  # now computations run by default on the cluster

多线程性能

请注意,从 Pandas 版本 0.19 开始,GIL 仍未针对 pd.merge 发布,因此我预计使用多线程不会带来巨大的速度提升。如果这对您很重要,那么我建议您在此处发表评论:https://github.com/pandas-dev/pandas/issues/13745