在 HPC 集群中创建 Dask LocalCluster 实例时 SLURM 任务失败
SLURM task fails when creating an instance of the Dask LocalCluster in an HPC cluster
我正在使用命令 sbatch
和下一个配置排队任务:
#SBATCH --job-name=dask-test
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=10
#SBATCH --mem=80G
#SBATCH --time=00:30:00
#SBATCH --tmp=10G
#SBATCH --partition=normal
#SBATCH --qos=normal
python ./dask-test.py
python脚本大致如下:
import pandas as pd
import dask.dataframe as dd
import numpy as np
from dask.distributed import Client, LocalCluster
print("Generating LocalCluster...")
cluster = LocalCluster()
print("Generating Client...")
client = Client(cluster, processes=False)
print("Scaling client...")
client.scale(8)
data = dd.read_csv(
BASE_DATA_SOURCE + '/Data-BIGDATFILES-*.csv',
delimiter=';',
)
def get_min_dt():
min_dt = data.datetime.min().compute()
print("Min is {}".format())
print("Getting min dt...")
get_min_dt()
第一个问题是文本“Generating LocalCluster...”打印了 6 次,这让我怀疑脚本是否同时 运行 多次。
其次,几分钟后什么也没打印,我收到以下消息:
/anaconda3/lib/python3.7/site-packages/distributed/node.py:155: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 37396 instead
http_address["port"], self.http_server.port
很多次..最后是下一个,也是很多次:
Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /cluster/home/user/anaconda3/lib/python3.7/asyncio/tasks.py:592> exception=RuntimeError('\n An attempt has been made to start a new process before the\n current process has finished its bootstrapping phase.\n\n This probably means that you are not using fork to start your\n child processes and you have forgotten to use the proper idiom\n in the main module:\n\n if __name__ == \'__main__\':\n freeze_support()\n ...\n\n The "freeze_support()" line can be omitted if the program\n is not going to be frozen to produce an executable.')>
Traceback (most recent call last):
File "/cluster/home/user/anaconda3/lib/python3.7/asyncio/tasks.py", line 599, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/cluster/home/user/anaconda3/lib/python3.7/site-packages/distributed/core.py", line 290, in _
await self.start()
File "/cluster/home/user/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 295, in start
response = await self.instantiate()
File "/cluster/home/user/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 378, in instantiate
result = await self.process.start()
File "/cluster/home/user/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 575, in start
await self.process.start()
File "/cluster/home/user/anaconda3/lib/python3.7/site-packages/distributed/process.py", line 34, in _call_and_set_future
res = func(*args, **kwargs)
File "/cluster/home/user/anaconda3/lib/python3.7/site-packages/distributed/process.py", line 202, in _start
process.start()
File "/cluster/home/user/anaconda3/lib/python3.7/multiprocessing/process.py", line 112, in start
self._popen = self._Popen(self)
File "/cluster/home/user/anaconda3/lib/python3.7/multiprocessing/context.py", line 284, in _Popen
return Popen(process_obj)
File "/cluster/home/user/anaconda3/lib/python3.7/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/cluster/home/user/anaconda3/lib/python3.7/multiprocessing/popen_fork.py", line 20, in __init__
self._launch(process_obj)
File "/cluster/home/user/anaconda3/lib/python3.7/multiprocessing/popen_spawn_posix.py", line 42, in _launch
prep_data = spawn.get_preparation_data(process_obj._name)
File "/cluster/home/user/anaconda3/lib/python3.7/multiprocessing/spawn.py", line 143, in get_preparation_data
_check_not_importing_main()
File "/cluster/home/user/anaconda3/lib/python3.7/multiprocessing/spawn.py", line 136, in _check_not_importing_main
is not going to be frozen to produce an executable.''')
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
我已经尝试添加更多内核、更多内存、在实例化 Client
时设置 processes=False
以及许多其他操作,但我无法弄清楚问题出在哪里。
使用的library/software版本是:
- Python 3.7
- Pandas 1.0.5
- 达斯克 2.19.0
- slurm 17.11.7
我是不是设置错了什么?使用本地集群和客户端结构的方式是否正确?
经过一番研究,我找到了解决办法。不太确定原因,但非常确定它有效。
LocalCluster、Client 及其后的所有代码(将分发执行的代码)的实例化不得在 Python 脚本的模块级别。相反,此代码必须在方法中或 __main__ 块内,如下所示:
import pandas as pd
import dask.dataframe as dd
import numpy as np
from dask.distributed import Client, LocalCluster
if __name__ == "__main__":
print("Generating LocalCluster...")
cluster = LocalCluster()
print("Generating Client...")
client = Client(cluster, processes=False)
print("Scaling client...")
client.scale(8)
data = dd.read_csv(
BASE_DATA_SOURCE + '/Data-BIGDATFILES-*.csv',
delimiter=';',
)
def get_min_dt():
min_dt = data.datetime.min().compute()
print("Min is {}".format())
print("Getting min dt...")
get_min_dt()
这个简单的改变带来了不同。在该问题线程中找到了解决方案:https://github.com/dask/distributed/issues/2520#issuecomment-470817810
我正在使用命令 sbatch
和下一个配置排队任务:
#SBATCH --job-name=dask-test
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=10
#SBATCH --mem=80G
#SBATCH --time=00:30:00
#SBATCH --tmp=10G
#SBATCH --partition=normal
#SBATCH --qos=normal
python ./dask-test.py
python脚本大致如下:
import pandas as pd
import dask.dataframe as dd
import numpy as np
from dask.distributed import Client, LocalCluster
print("Generating LocalCluster...")
cluster = LocalCluster()
print("Generating Client...")
client = Client(cluster, processes=False)
print("Scaling client...")
client.scale(8)
data = dd.read_csv(
BASE_DATA_SOURCE + '/Data-BIGDATFILES-*.csv',
delimiter=';',
)
def get_min_dt():
min_dt = data.datetime.min().compute()
print("Min is {}".format())
print("Getting min dt...")
get_min_dt()
第一个问题是文本“Generating LocalCluster...”打印了 6 次,这让我怀疑脚本是否同时 运行 多次。 其次,几分钟后什么也没打印,我收到以下消息:
/anaconda3/lib/python3.7/site-packages/distributed/node.py:155: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 37396 instead
http_address["port"], self.http_server.port
很多次..最后是下一个,也是很多次:
Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /cluster/home/user/anaconda3/lib/python3.7/asyncio/tasks.py:592> exception=RuntimeError('\n An attempt has been made to start a new process before the\n current process has finished its bootstrapping phase.\n\n This probably means that you are not using fork to start your\n child processes and you have forgotten to use the proper idiom\n in the main module:\n\n if __name__ == \'__main__\':\n freeze_support()\n ...\n\n The "freeze_support()" line can be omitted if the program\n is not going to be frozen to produce an executable.')>
Traceback (most recent call last):
File "/cluster/home/user/anaconda3/lib/python3.7/asyncio/tasks.py", line 599, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/cluster/home/user/anaconda3/lib/python3.7/site-packages/distributed/core.py", line 290, in _
await self.start()
File "/cluster/home/user/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 295, in start
response = await self.instantiate()
File "/cluster/home/user/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 378, in instantiate
result = await self.process.start()
File "/cluster/home/user/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 575, in start
await self.process.start()
File "/cluster/home/user/anaconda3/lib/python3.7/site-packages/distributed/process.py", line 34, in _call_and_set_future
res = func(*args, **kwargs)
File "/cluster/home/user/anaconda3/lib/python3.7/site-packages/distributed/process.py", line 202, in _start
process.start()
File "/cluster/home/user/anaconda3/lib/python3.7/multiprocessing/process.py", line 112, in start
self._popen = self._Popen(self)
File "/cluster/home/user/anaconda3/lib/python3.7/multiprocessing/context.py", line 284, in _Popen
return Popen(process_obj)
File "/cluster/home/user/anaconda3/lib/python3.7/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/cluster/home/user/anaconda3/lib/python3.7/multiprocessing/popen_fork.py", line 20, in __init__
self._launch(process_obj)
File "/cluster/home/user/anaconda3/lib/python3.7/multiprocessing/popen_spawn_posix.py", line 42, in _launch
prep_data = spawn.get_preparation_data(process_obj._name)
File "/cluster/home/user/anaconda3/lib/python3.7/multiprocessing/spawn.py", line 143, in get_preparation_data
_check_not_importing_main()
File "/cluster/home/user/anaconda3/lib/python3.7/multiprocessing/spawn.py", line 136, in _check_not_importing_main
is not going to be frozen to produce an executable.''')
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
我已经尝试添加更多内核、更多内存、在实例化 Client
时设置 processes=False
以及许多其他操作,但我无法弄清楚问题出在哪里。
使用的library/software版本是:
- Python 3.7
- Pandas 1.0.5
- 达斯克 2.19.0
- slurm 17.11.7
我是不是设置错了什么?使用本地集群和客户端结构的方式是否正确?
经过一番研究,我找到了解决办法。不太确定原因,但非常确定它有效。
LocalCluster、Client 及其后的所有代码(将分发执行的代码)的实例化不得在 Python 脚本的模块级别。相反,此代码必须在方法中或 __main__ 块内,如下所示:
import pandas as pd
import dask.dataframe as dd
import numpy as np
from dask.distributed import Client, LocalCluster
if __name__ == "__main__":
print("Generating LocalCluster...")
cluster = LocalCluster()
print("Generating Client...")
client = Client(cluster, processes=False)
print("Scaling client...")
client.scale(8)
data = dd.read_csv(
BASE_DATA_SOURCE + '/Data-BIGDATFILES-*.csv',
delimiter=';',
)
def get_min_dt():
min_dt = data.datetime.min().compute()
print("Min is {}".format())
print("Getting min dt...")
get_min_dt()
这个简单的改变带来了不同。在该问题线程中找到了解决方案:https://github.com/dask/distributed/issues/2520#issuecomment-470817810