如何使用 dask 执行多线程`merge()`?如何通过 qsub 使用多核?
How to execute a multi-threaded `merge()` with dask? How to use multiples cores via qsub?
我刚开始使用 dask,但我仍然对如何使用多线程或使用集群执行简单的 pandas 任务从根本上感到困惑。
让我们使用 pandas.merge()
和 dask
数据帧。
import dask.dataframe as dd
df1 = dd.read_csv("file1.csv")
df2 = dd.read_csv("file2.csv")
df3 = dd.merge(df1, df2)
现在,假设我要在我的 4 核笔记本电脑上 运行 这个。如何为这个任务分配 4 个线程?
看起来正确的方法是:
dask.set_options(get=dask.threaded.get)
df3 = dd.merge(df1, df2).compute()
这将使用尽可能多的线程(即,在您的笔记本电脑上存在尽可能多的具有共享内存的内核,4)?如何设置线程数?
假设我在一个有 100 个核心的设施中。我如何以与使用 qsub
向集群提交作业相同的方式提交它? (类似于 运行通过 MPI 在集群上执行任务?)
dask.set_options(get=dask.threaded.get)
df3 = dd.merge(df1, df2).compute
单机调度
Dask.dataframe 默认情况下将使用线程调度程序,线程数与计算机中的逻辑内核数一样多。
正如评论中所指出的,您可以使用 .compute()
方法的关键字参数来控制线程数或 Pool 实现。
分布式机器调度
您可以使用 dask.distributed to deploy dask workers across many nodes in a cluster。使用 qsub
执行此操作的一种方法是在本地启动 dask-scheduler
:
$ dask-scheduler
Scheduler started at 192.168.1.100:8786
然后使用qsub
启动多个dask-worker
进程,指向报告的地址:
$ qsub dask-worker 192.168.1.100:8786 ... <various options>
截至昨天,有一个实验包可以在任何启用 DRMAA 的系统(包括 SGE/qsub-like 系统)上执行此操作:https://github.com/dask/dask-drmaa
完成此操作后,您可以创建一个 dask.distributed.Client
对象,它将接管默认调度程序
from dask.distributed import Client
c = Client('192.168.1.100:8786') # now computations run by default on the cluster
多线程性能
请注意,从 Pandas 版本 0.19 开始,GIL 仍未针对 pd.merge
发布,因此我预计使用多线程不会带来巨大的速度提升。如果这对您很重要,那么我建议您在此处发表评论:https://github.com/pandas-dev/pandas/issues/13745
我刚开始使用 dask,但我仍然对如何使用多线程或使用集群执行简单的 pandas 任务从根本上感到困惑。
让我们使用 pandas.merge()
和 dask
数据帧。
import dask.dataframe as dd
df1 = dd.read_csv("file1.csv")
df2 = dd.read_csv("file2.csv")
df3 = dd.merge(df1, df2)
现在,假设我要在我的 4 核笔记本电脑上 运行 这个。如何为这个任务分配 4 个线程?
看起来正确的方法是:
dask.set_options(get=dask.threaded.get)
df3 = dd.merge(df1, df2).compute()
这将使用尽可能多的线程(即,在您的笔记本电脑上存在尽可能多的具有共享内存的内核,4)?如何设置线程数?
假设我在一个有 100 个核心的设施中。我如何以与使用 qsub
向集群提交作业相同的方式提交它? (类似于 运行通过 MPI 在集群上执行任务?)
dask.set_options(get=dask.threaded.get)
df3 = dd.merge(df1, df2).compute
单机调度
Dask.dataframe 默认情况下将使用线程调度程序,线程数与计算机中的逻辑内核数一样多。
正如评论中所指出的,您可以使用 .compute()
方法的关键字参数来控制线程数或 Pool 实现。
分布式机器调度
您可以使用 dask.distributed to deploy dask workers across many nodes in a cluster。使用 qsub
执行此操作的一种方法是在本地启动 dask-scheduler
:
$ dask-scheduler
Scheduler started at 192.168.1.100:8786
然后使用qsub
启动多个dask-worker
进程,指向报告的地址:
$ qsub dask-worker 192.168.1.100:8786 ... <various options>
截至昨天,有一个实验包可以在任何启用 DRMAA 的系统(包括 SGE/qsub-like 系统)上执行此操作:https://github.com/dask/dask-drmaa
完成此操作后,您可以创建一个 dask.distributed.Client
对象,它将接管默认调度程序
from dask.distributed import Client
c = Client('192.168.1.100:8786') # now computations run by default on the cluster
多线程性能
请注意,从 Pandas 版本 0.19 开始,GIL 仍未针对 pd.merge
发布,因此我预计使用多线程不会带来巨大的速度提升。如果这对您很重要,那么我建议您在此处发表评论:https://github.com/pandas-dev/pandas/issues/13745