无法在 python 3 的子进程中使用分发 LocalCluster
Unable to use distribute LocalCluster in subprocess in python 3
在带有 python 3 的子进程中使用分发的 LocalCluster 时出现错误(python 2 工作正常)。我有以下最小示例(我正在使用 python 3.6,分布式 1.23.3,龙卷风 5.1.1):
import multiprocessing
from distributed import LocalCluster
from distributed import Client
def call_client(cluster_address):
with Client(cluster_address):
pass
def main():
cluster = LocalCluster(n_workers=2)
print(cluster.workers)
process = multiprocessing.Process(
target=call_client, args=(cluster.scheduler.address, )
)
process.start()
process.join()
if __name__ == "__main__":
main()
执行文件时出现以下错误消息:
user@9b97e84a3c58:/workspace$ python test.py
[<Nanny: tcp://127.0.0.1:35779, threads: 2>, <Nanny: tcp://127.0.0.1:40211, threads: 2>]
Process Process-3:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "test.py", line 10, in call_client
with Client(cluster_address):
File "/home/user/venv/lib/python3.6/site-packages/distributed/client.py", line 610, in __init__
self.start(timeout=timeout)
File "/home/user/venv/lib/python3.6/site-packages/distributed/client.py", line 733, in start
sync(self.loop, self._start, **kwargs)
File "/home/user/venv/lib/python3.6/site-packages/distributed/utils.py", line 277, in sync
six.reraise(*error[0])
File "/home/user/venv/lib/python3.6/site-packages/six.py", line 693, in reraise
raise value
File "/home/user/venv/lib/python3.6/site-packages/distributed/utils.py", line 262, in f
result[0] = yield future
File "/home/user/venv/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/home/user/venv/lib/python3.6/site-packages/tornado/gen.py", line 1141, in run
yielded = self.gen.throw(*exc_info)
File "/home/user/venv/lib/python3.6/site-packages/distributed/client.py", line 821, in _start
yield self._ensure_connected(timeout=timeout)
File "/home/user/venv/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/home/user/venv/lib/python3.6/site-packages/tornado/gen.py", line 1141, in run
yielded = self.gen.throw(*exc_info)
File "/home/user/venv/lib/python3.6/site-packages/distributed/client.py", line 862, in _ensure_connected
self._update_scheduler_info())
File "/home/user/venv/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
tornado.util.TimeoutError: Timeout
使用 spawn 似乎有效。我怀疑有些状态不能很好地分叉。
process = multiprocessing.get_context('spawn').Process(...)
由于我最初的问题是在 Flask 应用程序中启动子进程,因此我无法按照 MRocklin 在另一个答案中的建议使用 'spawn'
。我现在的工作解决方案是我不在主进程中调用 cluster = LocalCluster(n_workers=2)
而是在子进程中启动它:
import sys
import multiprocessing
import signal
from functools import partial
from distributed import LocalCluster
from distributed import Client
def _stop_cluster(cluster, *args):
cluster.close()
sys.exit(0)
def _start_local_cluster(q, n_workers):
cluster = LocalCluster(n_workers=n_workers)
q.put(cluster.scheduler.address)
# shut down cluster when process is terminated
signal.signal(signal.SIGTERM, partial(_stop_cluster, cluster))
# run forever
signal.pause()
def call_client(cluster_address):
with Client(cluster_address):
print("I am working")
def main():
q = multiprocessing.Queue()
p_dask = multiprocessing.Process(target=_start_local_cluster, args=(q, 2))
p_dask.start()
cluster_address = q.get()
process = multiprocessing.Process(
target=call_client, args=(cluster_address, )
)
process.start()
process.join()
p_dask.terminate()
if __name__ == "__main__":
main()
在带有 python 3 的子进程中使用分发的 LocalCluster 时出现错误(python 2 工作正常)。我有以下最小示例(我正在使用 python 3.6,分布式 1.23.3,龙卷风 5.1.1):
import multiprocessing
from distributed import LocalCluster
from distributed import Client
def call_client(cluster_address):
with Client(cluster_address):
pass
def main():
cluster = LocalCluster(n_workers=2)
print(cluster.workers)
process = multiprocessing.Process(
target=call_client, args=(cluster.scheduler.address, )
)
process.start()
process.join()
if __name__ == "__main__":
main()
执行文件时出现以下错误消息:
user@9b97e84a3c58:/workspace$ python test.py
[<Nanny: tcp://127.0.0.1:35779, threads: 2>, <Nanny: tcp://127.0.0.1:40211, threads: 2>]
Process Process-3:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "test.py", line 10, in call_client
with Client(cluster_address):
File "/home/user/venv/lib/python3.6/site-packages/distributed/client.py", line 610, in __init__
self.start(timeout=timeout)
File "/home/user/venv/lib/python3.6/site-packages/distributed/client.py", line 733, in start
sync(self.loop, self._start, **kwargs)
File "/home/user/venv/lib/python3.6/site-packages/distributed/utils.py", line 277, in sync
six.reraise(*error[0])
File "/home/user/venv/lib/python3.6/site-packages/six.py", line 693, in reraise
raise value
File "/home/user/venv/lib/python3.6/site-packages/distributed/utils.py", line 262, in f
result[0] = yield future
File "/home/user/venv/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/home/user/venv/lib/python3.6/site-packages/tornado/gen.py", line 1141, in run
yielded = self.gen.throw(*exc_info)
File "/home/user/venv/lib/python3.6/site-packages/distributed/client.py", line 821, in _start
yield self._ensure_connected(timeout=timeout)
File "/home/user/venv/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/home/user/venv/lib/python3.6/site-packages/tornado/gen.py", line 1141, in run
yielded = self.gen.throw(*exc_info)
File "/home/user/venv/lib/python3.6/site-packages/distributed/client.py", line 862, in _ensure_connected
self._update_scheduler_info())
File "/home/user/venv/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
tornado.util.TimeoutError: Timeout
使用 spawn 似乎有效。我怀疑有些状态不能很好地分叉。
process = multiprocessing.get_context('spawn').Process(...)
由于我最初的问题是在 Flask 应用程序中启动子进程,因此我无法按照 MRocklin 在另一个答案中的建议使用 'spawn'
。我现在的工作解决方案是我不在主进程中调用 cluster = LocalCluster(n_workers=2)
而是在子进程中启动它:
import sys
import multiprocessing
import signal
from functools import partial
from distributed import LocalCluster
from distributed import Client
def _stop_cluster(cluster, *args):
cluster.close()
sys.exit(0)
def _start_local_cluster(q, n_workers):
cluster = LocalCluster(n_workers=n_workers)
q.put(cluster.scheduler.address)
# shut down cluster when process is terminated
signal.signal(signal.SIGTERM, partial(_stop_cluster, cluster))
# run forever
signal.pause()
def call_client(cluster_address):
with Client(cluster_address):
print("I am working")
def main():
q = multiprocessing.Queue()
p_dask = multiprocessing.Process(target=_start_local_cluster, args=(q, 2))
p_dask.start()
cluster_address = q.get()
process = multiprocessing.Process(
target=call_client, args=(cluster_address, )
)
process.start()
process.join()
p_dask.terminate()
if __name__ == "__main__":
main()