已尝试在当前进程完成其引导阶段之前启动新进程
An attempt has been made to start a new process before the current process has finished its bootstrapping phase
我是 dask 的新手,我发现有一个模块可以很容易地实现并行化,这真是太好了。我正在做一个项目,在这个项目中,我能够在一台机器上将一个循环并行化为 you can see here 。但是,我想转到 dask.distributed
。我对上面的 class 应用了以下更改:
diff --git a/mlchem/fingerprints/gaussian.py b/mlchem/fingerprints/gaussian.py
index ce6a72b..89f8638 100644
--- a/mlchem/fingerprints/gaussian.py
+++ b/mlchem/fingerprints/gaussian.py
@@ -6,7 +6,7 @@ from sklearn.externals import joblib
from .cutoff import Cosine
from collections import OrderedDict
import dask
-import dask.multiprocessing
+from dask.distributed import Client
import time
@@ -141,13 +141,14 @@ class Gaussian(object):
for image in images.items():
computations.append(self.fingerprints_per_image(image))
+ client = Client()
if self.scaler is None:
- feature_space = dask.compute(*computations, scheduler='processes',
+ feature_space = dask.compute(*computations, scheduler='distributed',
num_workers=self.cores)
feature_space = OrderedDict(feature_space)
else:
stacked_features = dask.compute(*computations,
- scheduler='processes',
+ scheduler='distributed',
num_workers=self.cores)
stacked_features = numpy.array(stacked_features)
这样做会产生此错误:
File "/usr/local/Cellar/python/3.7.2_2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 136, in _check_not_importing_main
is not going to be frozen to produce an executable.''')
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
我尝试了多种添加 if __name__ == '__main__':
的方法,但都没有成功。这可以是 reproduced by running this example。如果有人能帮我解决这个问题,我将不胜感激。我不知道应该如何更改我的代码以使其工作。
谢谢。
编辑:示例是 cu_training.py
。
Client
命令启动新进程,因此它必须位于 if __name__ == '__main__':
块内,如 SO question or this GitHub issue
中所述
这与多处理模块相同
即使在包含 main if __name__ == '__main__':
之后,我的代码也遇到了几个问题。
我正在使用多个 python 文件和模块,而 multiprocess 仅由一个函数用于某些采样操作。 唯一对我有用的修复方法是在第一个文件和整个代码的第一行 中包含 main(甚至包括导入)。以下效果很好:
if __name__ == '__main__':
from mjrl.utils.gym_env import GymEnv
from mjrl.policies.gaussian_mlp import MLP
from mjrl.baselines.quadratic_baseline import QuadraticBaseline
from mjrl.baselines.mlp_baseline import MLPBaseline
from mjrl.algos.npg_cg import NPG
from mjrl.algos.dapg import DAPG
from mjrl.algos.behavior_cloning import BC
from mjrl.utils.train_agent import train_agent
from mjrl.samplers.core import sample_paths
import os
import json
import mjrl.envs
import mj_envs
import time as timer
import pickle
import argparse
import numpy as np
# ===============================================================================
# Get command line arguments
# ===============================================================================
parser = argparse.ArgumentParser(description='Policy gradient algorithms with demonstration data.')
parser.add_argument('--output', type=str, required=True, help='location to store results')
parser.add_argument('--config', type=str, required=True, help='path to config file with exp params')
args = parser.parse_args()
JOB_DIR = args.output
if not os.path.exists(JOB_DIR):
os.mkdir(JOB_DIR)
with open(args.config, 'r') as f:
job_data = eval(f.read())
assert 'algorithm' in job_data.keys()
assert any([job_data['algorithm'] == a for a in ['NPG', 'BCRL', 'DAPG']])
job_data['lam_0'] = 0.0 if 'lam_0' not in job_data.keys() else job_data['lam_0']
job_data['lam_1'] = 0.0 if 'lam_1' not in job_data.keys() else job_data['lam_1']
EXP_FILE = JOB_DIR + '/job_config.json'
with open(EXP_FILE, 'w') as f:
json.dump(job_data, f, indent=4)
# ===============================================================================
# Train Loop
# ===============================================================================
e = GymEnv(job_data['env'])
policy = MLP(e.spec, hidden_sizes=job_data['policy_size'], seed=job_data['seed'])
baseline = MLPBaseline(e.spec, reg_coef=1e-3, batch_size=job_data['vf_batch_size'],
epochs=job_data['vf_epochs'], learn_rate=job_data['vf_learn_rate'])
我是 dask 的新手,我发现有一个模块可以很容易地实现并行化,这真是太好了。我正在做一个项目,在这个项目中,我能够在一台机器上将一个循环并行化为 you can see here 。但是,我想转到 dask.distributed
。我对上面的 class 应用了以下更改:
diff --git a/mlchem/fingerprints/gaussian.py b/mlchem/fingerprints/gaussian.py
index ce6a72b..89f8638 100644
--- a/mlchem/fingerprints/gaussian.py
+++ b/mlchem/fingerprints/gaussian.py
@@ -6,7 +6,7 @@ from sklearn.externals import joblib
from .cutoff import Cosine
from collections import OrderedDict
import dask
-import dask.multiprocessing
+from dask.distributed import Client
import time
@@ -141,13 +141,14 @@ class Gaussian(object):
for image in images.items():
computations.append(self.fingerprints_per_image(image))
+ client = Client()
if self.scaler is None:
- feature_space = dask.compute(*computations, scheduler='processes',
+ feature_space = dask.compute(*computations, scheduler='distributed',
num_workers=self.cores)
feature_space = OrderedDict(feature_space)
else:
stacked_features = dask.compute(*computations,
- scheduler='processes',
+ scheduler='distributed',
num_workers=self.cores)
stacked_features = numpy.array(stacked_features)
这样做会产生此错误:
File "/usr/local/Cellar/python/3.7.2_2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 136, in _check_not_importing_main
is not going to be frozen to produce an executable.''')
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
我尝试了多种添加 if __name__ == '__main__':
的方法,但都没有成功。这可以是 reproduced by running this example。如果有人能帮我解决这个问题,我将不胜感激。我不知道应该如何更改我的代码以使其工作。
谢谢。
编辑:示例是 cu_training.py
。
Client
命令启动新进程,因此它必须位于 if __name__ == '__main__':
块内,如 SO question or this GitHub issue
这与多处理模块相同
即使在包含 main if __name__ == '__main__':
之后,我的代码也遇到了几个问题。
我正在使用多个 python 文件和模块,而 multiprocess 仅由一个函数用于某些采样操作。 唯一对我有用的修复方法是在第一个文件和整个代码的第一行 中包含 main(甚至包括导入)。以下效果很好:
if __name__ == '__main__':
from mjrl.utils.gym_env import GymEnv
from mjrl.policies.gaussian_mlp import MLP
from mjrl.baselines.quadratic_baseline import QuadraticBaseline
from mjrl.baselines.mlp_baseline import MLPBaseline
from mjrl.algos.npg_cg import NPG
from mjrl.algos.dapg import DAPG
from mjrl.algos.behavior_cloning import BC
from mjrl.utils.train_agent import train_agent
from mjrl.samplers.core import sample_paths
import os
import json
import mjrl.envs
import mj_envs
import time as timer
import pickle
import argparse
import numpy as np
# ===============================================================================
# Get command line arguments
# ===============================================================================
parser = argparse.ArgumentParser(description='Policy gradient algorithms with demonstration data.')
parser.add_argument('--output', type=str, required=True, help='location to store results')
parser.add_argument('--config', type=str, required=True, help='path to config file with exp params')
args = parser.parse_args()
JOB_DIR = args.output
if not os.path.exists(JOB_DIR):
os.mkdir(JOB_DIR)
with open(args.config, 'r') as f:
job_data = eval(f.read())
assert 'algorithm' in job_data.keys()
assert any([job_data['algorithm'] == a for a in ['NPG', 'BCRL', 'DAPG']])
job_data['lam_0'] = 0.0 if 'lam_0' not in job_data.keys() else job_data['lam_0']
job_data['lam_1'] = 0.0 if 'lam_1' not in job_data.keys() else job_data['lam_1']
EXP_FILE = JOB_DIR + '/job_config.json'
with open(EXP_FILE, 'w') as f:
json.dump(job_data, f, indent=4)
# ===============================================================================
# Train Loop
# ===============================================================================
e = GymEnv(job_data['env'])
policy = MLP(e.spec, hidden_sizes=job_data['policy_size'], seed=job_data['seed'])
baseline = MLPBaseline(e.spec, reg_coef=1e-3, batch_size=job_data['vf_batch_size'],
epochs=job_data['vf_epochs'], learn_rate=job_data['vf_learn_rate'])