我如何在单个线程中 运行 dask.distributed 集群?
How do I run a dask.distributed cluster in a single thread?
如何在单个线程中 运行 一个完整的 Dask.distributed 集群?我想用它来调试或分析。
注意:这是一个常见问题。我将这里的问题和答案添加到 Stack Overflow 以供将来重用。
本地调度程序
如果您可以使用单机调度程序的API(只是计算),那么您可以使用单线程调度程序
x.compute(scheduler='single-threaded')
分布式调度程序 - 单机
如果您想 运行 在单台机器上建立 dask.distributed 集群,您可以不带参数启动客户端
from dask.distributed import Client
client = Client() # Starts local cluster
x.compute()
这使用很多线程但在一台机器上运行
分布式调度程序 - 单进程
或者,如果您想 运行 在单个进程中处理所有内容,则可以使用 processes=False
关键字
from dask.distributed import Client
client = Client(processes=False) # Starts local cluster
x.compute()
尽管计算发生在单独的线程池中,但所有通信和控制都发生在单个线程中。
分布式调度程序 - 单线程
为了 运行 在单个线程中进行控制、通信和计算,您需要创建一个 Tornado concurrent.futures 执行器。当心,这个 Tornado API 可能不是 public.
from dask.distributed import Scheduler, Worker, Client
from tornado.concurrent import DummyExecutor
from tornado.ioloop import IOLoop
import threading
loop = IOLoop()
e = DummyExecutor()
s = Scheduler(loop=loop)
s.start()
w = Worker(s.address, loop=loop, executor=e)
loop.add_callback(w._start)
async def f():
async with Client(s.address, start=False) as c:
future = c.submit(threading.get_ident)
result = await future
return result
>>> threading.get_ident() == loop.run_sync(f)
True
如何在单个线程中 运行 一个完整的 Dask.distributed 集群?我想用它来调试或分析。
注意:这是一个常见问题。我将这里的问题和答案添加到 Stack Overflow 以供将来重用。
本地调度程序
如果您可以使用单机调度程序的API(只是计算),那么您可以使用单线程调度程序
x.compute(scheduler='single-threaded')
分布式调度程序 - 单机
如果您想 运行 在单台机器上建立 dask.distributed 集群,您可以不带参数启动客户端
from dask.distributed import Client
client = Client() # Starts local cluster
x.compute()
这使用很多线程但在一台机器上运行
分布式调度程序 - 单进程
或者,如果您想 运行 在单个进程中处理所有内容,则可以使用 processes=False
关键字
from dask.distributed import Client
client = Client(processes=False) # Starts local cluster
x.compute()
尽管计算发生在单独的线程池中,但所有通信和控制都发生在单个线程中。
分布式调度程序 - 单线程
为了 运行 在单个线程中进行控制、通信和计算,您需要创建一个 Tornado concurrent.futures 执行器。当心,这个 Tornado API 可能不是 public.
from dask.distributed import Scheduler, Worker, Client
from tornado.concurrent import DummyExecutor
from tornado.ioloop import IOLoop
import threading
loop = IOLoop()
e = DummyExecutor()
s = Scheduler(loop=loop)
s.start()
w = Worker(s.address, loop=loop, executor=e)
loop.add_callback(w._start)
async def f():
async with Client(s.address, start=False) as c:
future = c.submit(threading.get_ident)
result = await future
return result
>>> threading.get_ident() == loop.run_sync(f)
True