运行 在 SLURM 集群上使用 dask_jobqueue scikit 学习

Running scikit learn using dask_jobqueue on a SLURM cluster

我有一个带有共享 NFS 文件夹的 4xRaspberry pi3 SLURM 集群。 4 个工人(主人也是一个工人,只使用其 4 个核心中的 3 个)

集群工作正常(我有 运行 一些使用 mpiexec 的并行 python 示例)。 现在,我想尝试一个 scikit-learn 示例,我看到的一些教程是将 DASK-jobqueue 与 SLURM 结合使用。

我的代码看起来像这样:

from dask_jobqueue import SLURMCluster

cluster = SLURMCluster( job_extra=['--partition=picluster'],
                        queue='myqueue',
                        cores=4,
                        memory='1GB'
                        )

cluster.scale(4) #the number of nodes to request

print(cluster.job_script())



from dask.distributed import Client
client = Client(cluster)






import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import brier_score_loss

from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import KFold, GridSearchCV

import numpy as np


#load the data from file
preds_trainval_file='./Predictions_TRAIN.csv'
outc_trainval_file = './Outcome_TRAIN.csv'
preds_test_file='./Predictions_TEST.csv'
outc_test_file = './Outcome_TEST.csv'

X_trainval= np.loadtxt(preds_trainval_file, delimiter=',')
y_trainval,_,_ = np.loadtxt(outc_trainval_file, delimiter=',' , usecols=(0, 1, 2), unpack=True)
X_test = np.loadtxt(preds_test_file, delimiter=',')
y_test,_ = np.loadtxt(outc_test_file, delimiter=',' , usecols=(0, 1), unpack=True)





#setup the classifier and perform cross validation
model = LogisticRegression( penalty='elasticnet', solver='saga', warm_start=True, max_iter=10000); param_grid= { 'l1_ratio' : [0, 0.25, 0.5, 0.75,  1],  'C':[0.1, 0.25, 0.5, 0.75, 1, 1.25]}


#setup grid search on the train+val data.
kfold = KFold(n_splits=5, shuffle=True)
grid_search = GridSearchCV(model, param_grid, cv=kfold,  scoring='neg_brier_score', n_jobs=-1)


import joblib

with joblib.parallel_backend('dask'):
  grid_search.fit(X_trainval, y_trainval)



y_prob=grid_search.predict_proba(X_test)
print(brier_score_loss(y_test, y_prob[:,0], pos_label=1))


据我所知,这是一个非常标准的设置,可以利用 scikit 的内置并行化。

当我 运行 这个脚本时,我得到以下信息:

pi@node01:/clusterfs/Python_scripts/Expert_ensemble $ python3 ensemble_tests.py
#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -p myqueue
#SBATCH -n 1
#SBATCH --cpus-per-task=4
#SBATCH --mem=954M
#SBATCH -t 00:30:00
#SBATCH --partition=picluster

/usr/bin/python3 -m distributed.cli.dask_worker tcp://192.168.1.10:38817 --nthreads 1 --nprocs 4 --memory-limit 250.00MB --name dummy-name --nanny --death-timeout 60 --protocol tcp://

Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /usr/lib/python3.7/asyncio/tasks.py:596> exception=RuntimeError('Command exited with non-zero exit code.\nExit code: 1\nCommand:\nsbatch /tmp/tmpz8a3jhys.sh\nstdout:\n\nstderr:\nsbatch: error: Memory specification can not be satisfied\nsbatch: error: Batch job submission failed: Requested node configuration is not available\n\n')>
Traceback (most recent call last):
  File "/usr/lib/python3.7/asyncio/tasks.py", line 603, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/pi/.local/lib/python3.7/site-packages/distributed/deploy/spec.py", line 71, in _
    await self.start()
  File "/home/pi/.local/lib/python3.7/site-packages/dask_jobqueue/core.py", line 324, in start
    out = await self._submit_job(fn)
  File "/home/pi/.local/lib/python3.7/site-packages/dask_jobqueue/core.py", line 307, in _submit_job
    return self._call(shlex.split(self.submit_command) + [script_filename])
  File "/home/pi/.local/lib/python3.7/site-packages/dask_jobqueue/core.py", line 407, in _call
    "stderr:\n{}\n".format(proc.returncode, cmd_str, out, err)
RuntimeError: Command exited with non-zero exit code.
Exit code: 1
Command:
sbatch /tmp/tmpz8a3jhys.sh
stdout:

stderr:
sbatch: error: Memory specification can not be satisfied
sbatch: error: Batch job submission failed: Requested node configuration is not available


tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x766f41b0>>, <Task finished coro=<SpecCluster._correct_state_internal() done, defined at /home/pi/.local/lib/python3.7/site-packages/distributed/deploy/spec.py:325> exception=RuntimeError('Command exited with non-zero exit code.\nExit code: 1\nCommand:\nsbatch /tmp/tmpc0ary0k1.sh\nstdout:\n\nstderr:\nsbatch: error: Memory specification can not be satisfied\nsbatch: error: Batch job submission failed: Requested node configuration is not available\n\n')>)
Traceback (most recent call last):
  File "/home/pi/.local/lib/python3.7/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/home/pi/.local/lib/python3.7/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/home/pi/.local/lib/python3.7/site-packages/distributed/deploy/spec.py", line 360, in _correct_state_internal
    await w  # for tornado gen.coroutine support
  File "/home/pi/.local/lib/python3.7/site-packages/distributed/deploy/spec.py", line 71, in _
    await self.start()
  File "/home/pi/.local/lib/python3.7/site-packages/dask_jobqueue/core.py", line 324, in start
    out = await self._submit_job(fn)
  File "/home/pi/.local/lib/python3.7/site-packages/dask_jobqueue/core.py", line 307, in _submit_job
    return self._call(shlex.split(self.submit_command) + [script_filename])
  File "/home/pi/.local/lib/python3.7/site-packages/dask_jobqueue/core.py", line 407, in _call
    "stderr:\n{}\n".format(proc.returncode, cmd_str, out, err)
RuntimeError: Command exited with non-zero exit code.
Exit code: 1
Command:
sbatch /tmp/tmpc0ary0k1.sh
stdout:

stderr:
sbatch: error: Memory specification can not be satisfied
sbatch: error: Batch job submission failed: Requested node configuration is not available


Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /usr/lib/python3.7/asyncio/tasks.py:596> exception=RuntimeError('Command exited with non-zero exit code.\nExit code: 1\nCommand:\nsbatch /tmp/tmp3sezvy1f.sh\nstdout:\n\nstderr:\nsbatch: error: Memory specification can not be satisfied\nsbatch: error: Batch job submission failed: Requested node configuration is not available\n\n')>
Traceback (most recent call last):
  File "/usr/lib/python3.7/asyncio/tasks.py", line 603, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/pi/.local/lib/python3.7/site-packages/distributed/deploy/spec.py", line 71, in _
    await self.start()
  File "/home/pi/.local/lib/python3.7/site-packages/dask_jobqueue/core.py", line 324, in start
    out = await self._submit_job(fn)
  File "/home/pi/.local/lib/python3.7/site-packages/dask_jobqueue/core.py", line 307, in _submit_job
    return self._call(shlex.split(self.submit_command) + [script_filename])
  File "/home/pi/.local/lib/python3.7/site-packages/dask_jobqueue/core.py", line 407, in _call
    "stderr:\n{}\n".format(proc.returncode, cmd_str, out, err)
RuntimeError: Command exited with non-zero exit code.
Exit code: 1
Command:
sbatch /tmp/tmp3sezvy1f.sh
stdout:

stderr:
sbatch: error: Memory specification can not be satisfied
sbatch: error: Batch job submission failed: Requested node configuration is not available

我不确定我做错了什么。无论是在SLURMCluster配置中还是其他什么。

sinfo 的输出是:

PARTITION  AVAIL  TIMELIMIT  NODES  STATE NODELIST
picluster*    up   infinite      4   idle node[01-04]

scontrol show nodes 的输出是:

scontrol show nodes
NodeName=node01 Arch=armv7l CoresPerSocket=1
   CPUAlloc=0 CPUTot=3 CPULoad=0.09
   AvailableFeatures=(null)
   ActiveFeatures=(null)
   Gres=(null)
   NodeAddr=192.168.2.10 NodeHostName=node01 Version=18.08
   OS=Linux 5.10.11-v7+ #1399 SMP Thu Jan 28 12:06:05 GMT 2021
   RealMemory=1 AllocMem=0 FreeMem=800 Sockets=3 Boards=1
   State=IDLE ThreadsPerCore=1 TmpDisk=0 Weight=1 Owner=N/A MCS_label=N/A
   Partitions=picluster
   BootTime=2021-02-20T05:49:48 SlurmdStartTime=2021-02-20T05:50:03
   CfgTRES=cpu=3,mem=1M,billing=3
   AllocTRES=
   CapWatts=n/a
   CurrentWatts=0 LowestJoules=0 ConsumedJoules=0
   ExtSensorsJoules=n/s ExtSensorsWatts=0 ExtSensorsTemp=n/s


NodeName=node02 Arch=armv7l CoresPerSocket=1
   CPUAlloc=0 CPUTot=4 CPULoad=0.27
   AvailableFeatures=(null)
   ActiveFeatures=(null)
   Gres=(null)
   NodeAddr=192.168.2.11 NodeHostName=node02 Version=18.08
   OS=Linux 5.10.11-v7+ #1399 SMP Thu Jan 28 12:06:05 GMT 2021
   RealMemory=1 AllocMem=0 FreeMem=813 Sockets=4 Boards=1
   State=IDLE ThreadsPerCore=1 TmpDisk=0 Weight=1 Owner=N/A MCS_label=N/A
   Partitions=picluster
   BootTime=2021-02-20T05:49:37 SlurmdStartTime=2021-02-20T05:50:10
   CfgTRES=cpu=4,mem=1M,billing=4
   AllocTRES=
   CapWatts=n/a
   CurrentWatts=0 LowestJoules=0 ConsumedJoules=0
   ExtSensorsJoules=n/s ExtSensorsWatts=0 ExtSensorsTemp=n/s


NodeName=node03 Arch=armv7l CoresPerSocket=1
   CPUAlloc=0 CPUTot=4 CPULoad=0.24
   AvailableFeatures=(null)
   ActiveFeatures=(null)
   Gres=(null)
   NodeAddr=192.168.2.12 NodeHostName=node03 Version=18.08
   OS=Linux 5.10.11-v7+ #1399 SMP Thu Jan 28 12:06:05 GMT 2021
   RealMemory=1 AllocMem=0 FreeMem=821 Sockets=4 Boards=1
   State=IDLE ThreadsPerCore=1 TmpDisk=0 Weight=1 Owner=N/A MCS_label=N/A
   Partitions=picluster
   BootTime=2021-02-20T05:49:37 SlurmdStartTime=2021-02-20T05:50:09
   CfgTRES=cpu=4,mem=1M,billing=4
   AllocTRES=
   CapWatts=n/a
   CurrentWatts=0 LowestJoules=0 ConsumedJoules=0
   ExtSensorsJoules=n/s ExtSensorsWatts=0 ExtSensorsTemp=n/s


NodeName=node04 Arch=armv7l CoresPerSocket=1
   CPUAlloc=0 CPUTot=4 CPULoad=0.14
   AvailableFeatures=(null)
   ActiveFeatures=(null)
   Gres=(null)
   NodeAddr=192.168.2.13 NodeHostName=node04 Version=18.08
   OS=Linux 5.10.11-v7+ #1399 SMP Thu Jan 28 12:06:05 GMT 2021
   RealMemory=1 AllocMem=0 FreeMem=813 Sockets=4 Boards=1
   State=IDLE ThreadsPerCore=1 TmpDisk=0 Weight=1 Owner=N/A MCS_label=N/A
   Partitions=picluster
   BootTime=2021-02-20T05:49:40 SlurmdStartTime=2021-02-20T05:50:08
   CfgTRES=cpu=4,mem=1M,billing=4
   AllocTRES=
   CapWatts=n/a
   CurrentWatts=0 LowestJoules=0 ConsumedJoules=0
   ExtSensorsJoules=n/s ExtSensorsWatts=0 ExtSensorsTemp=n/s


如您所见,只有主节点 (node01) 分配了 CPUTot=3。所有其他节点都有标准 4。但我还测试了重新配置集群,所有节点都具有相同的 CPUTot=4,并且在 运行 宁 python 脚本时仍然得到相同的错误。另外,我试过只为集群中的每个节点申请500MB的内存,但还是报同样的错误。

感谢任何帮助。

谢谢

好的,所以我找到了解决方案。我不确定问题出在哪里,但您可以通过使用 header_skip 选项覆盖内存要求来覆盖内存问题。所以改变行从

cluster = SLURMCluster( job_extra=['--partition=picluster'],
                        queue='myqueue',
                        cores=4,
                        memory='1GB'
                        )

cluster = SLURMCluster( header_skip=['--mem'],
                        queue='picluster',
                        cores=4,
                        memory='1GB'
                        )

在那之后,它似乎工作正常。但是还是不明白是什么问题is/was.