无法在 python 3 的子进程中使用分发 LocalCluster

Unable to use distribute LocalCluster in subprocess in python 3

在带有 python 3 的子进程中使用分发的 LocalCluster 时出现错误(python 2 工作正常)。我有以下最小示例(我正在使用 python 3.6,分布式 1.23.3,龙卷风 5.1.1):

import multiprocessing

from distributed import LocalCluster
from distributed import Client



def call_client(cluster_address):
    with Client(cluster_address):
        pass


def main():
    cluster = LocalCluster(n_workers=2)
    print(cluster.workers)

    process = multiprocessing.Process(
        target=call_client, args=(cluster.scheduler.address, )
    )
    process.start()
    process.join()


if __name__ == "__main__":
    main()

执行文件时出现以下错误消息:

user@9b97e84a3c58:/workspace$ python test.py
[<Nanny: tcp://127.0.0.1:35779, threads: 2>, <Nanny: tcp://127.0.0.1:40211, threads: 2>]
Process Process-3:
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "test.py", line 10, in call_client
    with Client(cluster_address):
  File "/home/user/venv/lib/python3.6/site-packages/distributed/client.py", line 610, in __init__
    self.start(timeout=timeout)
  File "/home/user/venv/lib/python3.6/site-packages/distributed/client.py", line 733, in start
    sync(self.loop, self._start, **kwargs)
  File "/home/user/venv/lib/python3.6/site-packages/distributed/utils.py", line 277, in sync
    six.reraise(*error[0])
  File "/home/user/venv/lib/python3.6/site-packages/six.py", line 693, in reraise
    raise value
  File "/home/user/venv/lib/python3.6/site-packages/distributed/utils.py", line 262, in f
    result[0] = yield future
  File "/home/user/venv/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/home/user/venv/lib/python3.6/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/user/venv/lib/python3.6/site-packages/distributed/client.py", line 821, in _start
    yield self._ensure_connected(timeout=timeout)
  File "/home/user/venv/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/home/user/venv/lib/python3.6/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/user/venv/lib/python3.6/site-packages/distributed/client.py", line 862, in _ensure_connected
    self._update_scheduler_info())
  File "/home/user/venv/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
tornado.util.TimeoutError: Timeout

使用 spawn 似乎有效。我怀疑有些状态不能很好地分叉。

process = multiprocessing.get_context('spawn').Process(...)

由于我最初的问题是在 Flask 应用程序中启动子进程,因此我无法按照 MRocklin 在另一个答案中的建议使用 'spawn'。我现在的工作解决方案是我不在主进程中调用 cluster = LocalCluster(n_workers=2) 而是在子进程中启动它:

import sys
import multiprocessing
import signal
from functools import partial

from distributed import LocalCluster
from distributed import Client


def _stop_cluster(cluster, *args):
    cluster.close()
    sys.exit(0)


def _start_local_cluster(q, n_workers):
    cluster = LocalCluster(n_workers=n_workers)
    q.put(cluster.scheduler.address)

    # shut down cluster when process is terminated
    signal.signal(signal.SIGTERM, partial(_stop_cluster, cluster))
    # run forever
    signal.pause()


def call_client(cluster_address):
    with Client(cluster_address):
        print("I am working")


def main():
    q = multiprocessing.Queue()
    p_dask = multiprocessing.Process(target=_start_local_cluster, args=(q, 2))
    p_dask.start()
    cluster_address = q.get()

    process = multiprocessing.Process(
        target=call_client, args=(cluster_address, )
    )
    process.start()
    process.join()

    p_dask.terminate()


if __name__ == "__main__":
    main()