使用 SSHCluster 客户端的 Dask 期货未并行化
Dask futures using SSHCluster client not parallalelizing
我是 Dask 的新手,我目前正在尝试使用期货制作一个简单的例子,但我似乎无法让它发挥作用。我的目标是使用 futures 获取我所有节点的主机名(我有一个由 3 个节点组成的集群,其中 2 个是工作节点,一个是调度程序)。
为此,我创建了一个休眠 2 秒的函数,然后获取主机名。但是,当我多次启动该功能时,我似乎只获得了一个节点的主机名,而不是 2 个。这是我的代码,输出:
#!/usr/bin/python3
from dask.distributed import Client, SSHCluster
from time import sleep
import dask
from dask import delayed
import socket
import time
cluster=SSHCluster(["10.20.3.1","c002-interconnect-1","c003-interconnect-1"],connect_options={"known_hosts":None},worker_options={},scheduler_options={"port":0,"dashboard_address":":8788"})
client=Client(cluster)
client.cluster.scale(2)
def gethost():
sleep(2)
return socket.gethostname()
futures=[]
workers=(client.scheduler_info()['workers'])
print("workers details")
start=time.time()
for i in workers.keys():
l=(workers[i]['nthreads'])
l2=(workers[i]['metrics']['cpu'])
print("worker :"+i)
print('number of cpu: ')
print(l2)
print('number of threads:')
print(l)
print("##########")
for j in range(int((l*(l2+10)))):
future=client.submit(gethost)
futures.append(future)
results=[future.result() for future in futures]
end=time.time()
print("results:")
print(results)
print("time: ")
print(end-start)
这是我得到的输出:
distributed.deploy.ssh - INFO - distributed.scheduler - INFO - -----------------------------------------------
distributed.deploy.ssh - INFO - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.deploy.ssh - INFO - /usr/local/lib/python3.6/site-packages/distributed/node.py:155: UserWarning: Port 8788 is already in use.
distributed.deploy.ssh - INFO - Perhaps you already have a cluster running?
distributed.deploy.ssh - INFO - Hosting the HTTP server on port 39850 instead
distributed.deploy.ssh - INFO - http_address["port"], self.http_server.port
distributed.deploy.ssh - INFO - distributed.scheduler - INFO - -----------------------------------------------
distributed.deploy.ssh - INFO - distributed.scheduler - INFO - Clear task state
distributed.deploy.ssh - INFO - distributed.scheduler - INFO - Scheduler at: tcp://10.20.3.1:41283
distributed.deploy.ssh - INFO - distributed.nanny - INFO - Start Nanny at: 'tcp://10.20.3.3:46321'
distributed.deploy.ssh - INFO - distributed.nanny - INFO - Start Nanny at: 'tcp://10.20.3.2:32907'
distributed.deploy.ssh - INFO - distributed.worker - INFO - Start worker at: tcp://10.20.3.3:35165
distributed.deploy.ssh - INFO - distributed.worker - INFO - Start worker at: tcp://10.20.3.2:33232
workers details
worker :tcp://10.20.3.2:33232
number of cpu:
0.0
number of threads:
12
##########
worker :tcp://10.20.3.3:35165
number of cpu:
0.0
number of threads:
12
##########
results:
['c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002']
time:
3.2259795665740967
如果您需要更多详细信息,请告诉我。
这也是我的第一个堆栈溢出问题,如果我做错了什么,请告诉我!
可能需要在工人上导入 socket
:
def gethost():
sleep(2)
import socket
return socket.gethostname()
此外,client.submit
部分应指定由哪个 worker 执行函数:
for i in workers.keys():
# code skipped
future = client.submit(gethost, pure=False, workers=[i])
我目前无法尝试,但希望这对您有所帮助。
我是 Dask 的新手,我目前正在尝试使用期货制作一个简单的例子,但我似乎无法让它发挥作用。我的目标是使用 futures 获取我所有节点的主机名(我有一个由 3 个节点组成的集群,其中 2 个是工作节点,一个是调度程序)。 为此,我创建了一个休眠 2 秒的函数,然后获取主机名。但是,当我多次启动该功能时,我似乎只获得了一个节点的主机名,而不是 2 个。这是我的代码,输出:
#!/usr/bin/python3
from dask.distributed import Client, SSHCluster
from time import sleep
import dask
from dask import delayed
import socket
import time
cluster=SSHCluster(["10.20.3.1","c002-interconnect-1","c003-interconnect-1"],connect_options={"known_hosts":None},worker_options={},scheduler_options={"port":0,"dashboard_address":":8788"})
client=Client(cluster)
client.cluster.scale(2)
def gethost():
sleep(2)
return socket.gethostname()
futures=[]
workers=(client.scheduler_info()['workers'])
print("workers details")
start=time.time()
for i in workers.keys():
l=(workers[i]['nthreads'])
l2=(workers[i]['metrics']['cpu'])
print("worker :"+i)
print('number of cpu: ')
print(l2)
print('number of threads:')
print(l)
print("##########")
for j in range(int((l*(l2+10)))):
future=client.submit(gethost)
futures.append(future)
results=[future.result() for future in futures]
end=time.time()
print("results:")
print(results)
print("time: ")
print(end-start)
这是我得到的输出:
distributed.deploy.ssh - INFO - distributed.scheduler - INFO - -----------------------------------------------
distributed.deploy.ssh - INFO - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.deploy.ssh - INFO - /usr/local/lib/python3.6/site-packages/distributed/node.py:155: UserWarning: Port 8788 is already in use.
distributed.deploy.ssh - INFO - Perhaps you already have a cluster running?
distributed.deploy.ssh - INFO - Hosting the HTTP server on port 39850 instead
distributed.deploy.ssh - INFO - http_address["port"], self.http_server.port
distributed.deploy.ssh - INFO - distributed.scheduler - INFO - -----------------------------------------------
distributed.deploy.ssh - INFO - distributed.scheduler - INFO - Clear task state
distributed.deploy.ssh - INFO - distributed.scheduler - INFO - Scheduler at: tcp://10.20.3.1:41283
distributed.deploy.ssh - INFO - distributed.nanny - INFO - Start Nanny at: 'tcp://10.20.3.3:46321'
distributed.deploy.ssh - INFO - distributed.nanny - INFO - Start Nanny at: 'tcp://10.20.3.2:32907'
distributed.deploy.ssh - INFO - distributed.worker - INFO - Start worker at: tcp://10.20.3.3:35165
distributed.deploy.ssh - INFO - distributed.worker - INFO - Start worker at: tcp://10.20.3.2:33232
workers details
worker :tcp://10.20.3.2:33232
number of cpu:
0.0
number of threads:
12
##########
worker :tcp://10.20.3.3:35165
number of cpu:
0.0
number of threads:
12
##########
results:
['c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002']
time:
3.2259795665740967
如果您需要更多详细信息,请告诉我。 这也是我的第一个堆栈溢出问题,如果我做错了什么,请告诉我!
可能需要在工人上导入 socket
:
def gethost():
sleep(2)
import socket
return socket.gethostname()
此外,client.submit
部分应指定由哪个 worker 执行函数:
for i in workers.keys():
# code skipped
future = client.submit(gethost, pure=False, workers=[i])
我目前无法尝试,但希望这对您有所帮助。