如何终止由 dask 多处理调度程序启动的工作人员?
How to terminate workers started by dask multiprocessing scheduler?
在长期使用dask multiprocessing scheduler 后,我注意到multiprocessing scheduler 启动的python 个进程占用大量内存。如何重新启动工作器池?
更新:您可以这样做来杀死由多处理调度程序启动的工人:
from dask.context import _globals
pool = _globals.pop('pool') # remove the pool from globals to make dask create a new one
pool.close()
pool.terminate()
pool.join()
第一个回答:
对于消耗大量内存的任务,我更喜欢使用 distributed
调度程序,即使在本地主机也是如此。
非常简单:
- 一次启动调度程序shell:
$ dask-scheduler
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Scheduler at: 1.2.3.4:8786
distributed.scheduler - INFO - http at: 1.2.3.4:9786
distributed.bokeh.application - INFO - Web UI: http://1.2.3.4:8787/status/
distributed.scheduler - INFO - -----------------------------------------------
distributed.core - INFO - Connection from 1.2.3.4:56240 to Scheduler
distributed.core - INFO - Connection from 1.2.3.4:56241 to Scheduler
distributed.core - INFO - Connection from 1.2.3.4:56242 to Scheduler
- 在另一个shell中启动worker,您可以相应地调整参数:
$ dask-worker --nprocs 8 --nthreads 1 --memory-limit .8 1.2.3.4:8786
distributed.nanny - INFO - Start Nanny at: 127.0.0.1:61760
distributed.nanny - INFO - Start Nanny at: 127.0.0.1:61761
distributed.nanny - INFO - Start Nanny at: 127.0.0.1:61762
distributed.nanny - INFO - Start Nanny at: 127.0.0.1:61763
distributed.worker - INFO - Start worker at: 127.0.0.1:61765
distributed.worker - INFO - nanny at: 127.0.0.1:61760
distributed.worker - INFO - http at: 127.0.0.1:61764
distributed.worker - INFO - Waiting to connect to: 127.0.0.1:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.nanny - INFO - Start Nanny at: 127.0.0.1:61767
distributed.worker - INFO - Memory: 1.72 GB
distributed.worker - INFO - Local Directory: /var/folders/55/nbg15c6j4k3cg06tjfhqypd40000gn/T/nanny-11ygswb9
...
- 最后使用
distributed.Client
class 提交您的工作。
In [1]: from distributed import Client
In [2]: client = Client('1.2.3.4:8786')
In [3]: client
<Client: scheduler="127.0.0.1:61829" processes=8 cores=8>
In [4]: from distributed.diagnostics import progress
In [5]: import dask.bag
In [6]: data = dask.bag.range(10000, 8)
In [7]: data
dask.bag
In [8]: future = client.compute(data.sum())
In [9]: progress(future)
[########################################] | 100% Completed | 0.0s
In [10]: future.result()
49995000
我发现这种方式比默认调度程序更可靠。我更喜欢明确提交任务并处理未来以使用进度小部件,这在笔记本中非常好。在等待结果的同时,您仍然可以做一些事情。
如果由于内存问题出现错误,您可以重新启动工作程序或调度程序(重新开始),使用较小的数据块并重试。
在长期使用dask multiprocessing scheduler 后,我注意到multiprocessing scheduler 启动的python 个进程占用大量内存。如何重新启动工作器池?
更新:您可以这样做来杀死由多处理调度程序启动的工人:
from dask.context import _globals pool = _globals.pop('pool') # remove the pool from globals to make dask create a new one pool.close() pool.terminate() pool.join()
第一个回答:
对于消耗大量内存的任务,我更喜欢使用 distributed
调度程序,即使在本地主机也是如此。
非常简单:
- 一次启动调度程序shell:
$ dask-scheduler distributed.scheduler - INFO - ----------------------------------------------- distributed.scheduler - INFO - Scheduler at: 1.2.3.4:8786 distributed.scheduler - INFO - http at: 1.2.3.4:9786 distributed.bokeh.application - INFO - Web UI: http://1.2.3.4:8787/status/ distributed.scheduler - INFO - ----------------------------------------------- distributed.core - INFO - Connection from 1.2.3.4:56240 to Scheduler distributed.core - INFO - Connection from 1.2.3.4:56241 to Scheduler distributed.core - INFO - Connection from 1.2.3.4:56242 to Scheduler
- 在另一个shell中启动worker,您可以相应地调整参数:
$ dask-worker --nprocs 8 --nthreads 1 --memory-limit .8 1.2.3.4:8786 distributed.nanny - INFO - Start Nanny at: 127.0.0.1:61760 distributed.nanny - INFO - Start Nanny at: 127.0.0.1:61761 distributed.nanny - INFO - Start Nanny at: 127.0.0.1:61762 distributed.nanny - INFO - Start Nanny at: 127.0.0.1:61763 distributed.worker - INFO - Start worker at: 127.0.0.1:61765 distributed.worker - INFO - nanny at: 127.0.0.1:61760 distributed.worker - INFO - http at: 127.0.0.1:61764 distributed.worker - INFO - Waiting to connect to: 127.0.0.1:8786 distributed.worker - INFO - ------------------------------------------------- distributed.worker - INFO - Threads: 1 distributed.nanny - INFO - Start Nanny at: 127.0.0.1:61767 distributed.worker - INFO - Memory: 1.72 GB distributed.worker - INFO - Local Directory: /var/folders/55/nbg15c6j4k3cg06tjfhqypd40000gn/T/nanny-11ygswb9 ...
- 最后使用
distributed.Client
class 提交您的工作。
In [1]: from distributed import Client In [2]: client = Client('1.2.3.4:8786') In [3]: client <Client: scheduler="127.0.0.1:61829" processes=8 cores=8> In [4]: from distributed.diagnostics import progress In [5]: import dask.bag In [6]: data = dask.bag.range(10000, 8) In [7]: data dask.bag In [8]: future = client.compute(data.sum()) In [9]: progress(future) [########################################] | 100% Completed | 0.0s In [10]: future.result() 49995000
我发现这种方式比默认调度程序更可靠。我更喜欢明确提交任务并处理未来以使用进度小部件,这在笔记本中非常好。在等待结果的同时,您仍然可以做一些事情。
如果由于内存问题出现错误,您可以重新启动工作程序或调度程序(重新开始),使用较小的数据块并重试。