如何保证每个worker恰好使用一个CPU?

How to ensure each worker use exactly one CPU?

我正在使用 ray 实现 SEED,因此,我定义了一个 Worker class 如下

import numpy as np
import gym

class Worker:
    def __init__(self, worker_id, env_name, n):
        import os
        os.environ['OPENBLAS_NUM_THREADS'] = '1'
        self._id = worker_id
        self._n_envs = n
        self._envs = [gym.make(env_name) 
            for _ in range(self._n_envs)]

    def reset_env(self, env_id):
        return self._envs[env_id].reset()

    def env_step(self, env_id, action):
        return self._envs[env_id].step(action)

除此之外,Leaner 中有一个循环,在需要与环境交互时调用 Worker 的方法。

正如 this document 所建议的那样,我想确保每个工作人员只使用一个 CPU 资源。这是我的一些尝试:

  1. 创建worker时,我设置num_cpus=1worker=ray.remote(num_cpus=1)(Worker).remote(...)
  2. 我使用 np.__config__.show() 检查了我的 numpy 配置,它给了我以下信息

blas_mkl_info: NOT AVAILABLE

blis_info: NOT AVAILABLE

openblas_info: libraries = ['openblas', 'openblas'] library_dirs = ['/usr/local/lib'] language = c define_macros = [('HAVE_CBLAS', None)]

blas_opt_info: libraries = ['openblas', 'openblas'] library_dirs = ['/usr/local/lib'] language = c define_macros = [('HAVE_CBLAS', None)]

lapack_mkl_info: NOT AVAILABLE

openblas_lapack_info: libraries = ['openblas', 'openblas'] library_dirs = ['/usr/local/lib'] language = c define_macros = [('HAVE_CBLAS', None)]

lapack_opt_info: libraries = ['openblas', 'openblas'] library_dirs = ['/usr/local/lib'] language = c define_macros = [('HAVE_CBLAS', None)]

我注意到 numpy 使用的是 OpenBLAS,所以我在 Worker class 中设置了 os.environ['OPENBLAS_NUM_THREADS'] = '1',因为上面的代码遵循 this instruction.

两者都完成后,打开top,还是发现每个Worker都使用了130%-180% CPUs,和之前一模一样。我还尝试在主 python 脚本的开头设置 os.environ['OPENBLAS_NUM_THREADS'] = '1' 或使用 export OPENBLAS_NUM_THREADS=1,但没有任何帮助。我现在可以做什么?

你可以把你的核心寄托在每个工人身上。例如,您可以使用 psutil.Process().cpu_affinity([i]) 之类的东西在每个 worker 上固定一个索引 i core 。

此外,在固定 cpu 之前,请务必了解此 api 已将什么 cpu 分配给该工作人员。 https://github.com/ray-project/ray/blob/203c077895ac422b80e31f062d33eadb89e66768/python/ray/worker.py#L457

示例:

ray.init(num_cpus=4)
@ray.remote(num_cpus=1) 
def f(): 
   import numpy 
   resources = ray.ray.get_resource_ids() 
   cpus = [v[0] for v in resources['CPU']]
   psutil.Process().cpu_affinity(cpus)