已尝试在当前进程完成其引导阶段之前启动新进程

An attempt has been made to start a new process before the current process has finished its bootstrapping phase

我是 dask 的新手,我发现有一个模块可以很容易地实现并行化,这真是太好了。我正在做一个项目,在这个项目中,我能够在一台机器上将一个循环并行化为 you can see here 。但是,我想转到 dask.distributed。我对上面的 class 应用了以下更改:

diff --git a/mlchem/fingerprints/gaussian.py b/mlchem/fingerprints/gaussian.py
index ce6a72b..89f8638 100644
--- a/mlchem/fingerprints/gaussian.py
+++ b/mlchem/fingerprints/gaussian.py
@@ -6,7 +6,7 @@ from sklearn.externals import joblib
 from .cutoff import Cosine
 from collections import OrderedDict
 import dask
-import dask.multiprocessing
+from dask.distributed import Client
 import time


@@ -141,13 +141,14 @@ class Gaussian(object):
         for image in images.items():
             computations.append(self.fingerprints_per_image(image))

+        client = Client()
         if self.scaler is None:
-            feature_space = dask.compute(*computations, scheduler='processes',
+            feature_space = dask.compute(*computations, scheduler='distributed',
                                          num_workers=self.cores)
             feature_space = OrderedDict(feature_space)
         else:
             stacked_features = dask.compute(*computations,
-                                            scheduler='processes',
+                                            scheduler='distributed',
                                             num_workers=self.cores)

             stacked_features = numpy.array(stacked_features)

这样做会产生此错误:

 File "/usr/local/Cellar/python/3.7.2_2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 136, in _check_not_importing_main
    is not going to be frozen to produce an executable.''')
RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

我尝试了多种添加 if __name__ == '__main__': 的方法,但都没有成功。这可以是 reproduced by running this example。如果有人能帮我解决这个问题,我将不胜感激。我不知道应该如何更改我的代码以使其工作。

谢谢。

编辑:示例是 cu_training.py

Client 命令启动新进程,因此它必须位于 if __name__ == '__main__': 块内,如 SO question or this GitHub issue

中所述

这与多处理模块相同

即使在包含 main if __name__ == '__main__': 之后,我的代码也遇到了几个问题。

我正在使用多个 python 文件和模块,而 multiprocess 仅由一个函数用于某些采样操作。 唯一对我有用的修复方法是在第一个文件和整个代码的第一行 中包含 main(甚至包括导入)。以下效果很好:

if __name__ == '__main__':
    from mjrl.utils.gym_env import GymEnv
    from mjrl.policies.gaussian_mlp import MLP
    from mjrl.baselines.quadratic_baseline import QuadraticBaseline
    from mjrl.baselines.mlp_baseline import MLPBaseline
    from mjrl.algos.npg_cg import NPG
    from mjrl.algos.dapg import DAPG
    from mjrl.algos.behavior_cloning import BC
    from mjrl.utils.train_agent import train_agent
    from mjrl.samplers.core import sample_paths
    import os
    import json
    import mjrl.envs
    import mj_envs
    import time as timer
    import pickle
    import argparse

    import numpy as np 

    # ===============================================================================
    # Get command line arguments
    # ===============================================================================

    parser = argparse.ArgumentParser(description='Policy gradient algorithms with demonstration data.')
    parser.add_argument('--output', type=str, required=True, help='location to store results')
    parser.add_argument('--config', type=str, required=True, help='path to config file with exp params')
    args = parser.parse_args()
    JOB_DIR = args.output
    if not os.path.exists(JOB_DIR):
        os.mkdir(JOB_DIR)
    with open(args.config, 'r') as f:
        job_data = eval(f.read())
    assert 'algorithm' in job_data.keys()
    assert any([job_data['algorithm'] == a for a in ['NPG', 'BCRL', 'DAPG']])
    job_data['lam_0'] = 0.0 if 'lam_0' not in job_data.keys() else job_data['lam_0']
    job_data['lam_1'] = 0.0 if 'lam_1' not in job_data.keys() else job_data['lam_1']
    EXP_FILE = JOB_DIR + '/job_config.json'
    with open(EXP_FILE, 'w') as f:
        json.dump(job_data, f, indent=4)

    # ===============================================================================
    # Train Loop
    # ===============================================================================

    e = GymEnv(job_data['env'])
    policy = MLP(e.spec, hidden_sizes=job_data['policy_size'], seed=job_data['seed'])
    baseline = MLPBaseline(e.spec, reg_coef=1e-3, batch_size=job_data['vf_batch_size'],
                           epochs=job_data['vf_epochs'], learn_rate=job_data['vf_learn_rate'])