使用 SSHCluster 客户端的 Dask 期货未并行化

Dask futures using SSHCluster client not parallalelizing

我是 Dask 的新手,我目前正在尝试使用期货制作一个简单的例子,但我似乎无法让它发挥作用。我的目标是使用 futures 获取我所有节点的主机名(我有一个由 3 个节点组成的集群,其中 2 个是工作节点,一个是调度程序)。 为此,我创建了一个休眠 2 秒的函数,然后获取主机名。但是,当我多次启动该功能时,我似乎只获得了一个节点的主机名,而不是 2 个。这是我的代码,输出:

#!/usr/bin/python3
from dask.distributed import Client, SSHCluster
from time import sleep
import dask
from dask import delayed
import socket
import time


cluster=SSHCluster(["10.20.3.1","c002-interconnect-1","c003-interconnect-1"],connect_options={"known_hosts":None},worker_options={},scheduler_options={"port":0,"dashboard_address":":8788"})
client=Client(cluster)
client.cluster.scale(2)

def gethost():
    sleep(2)
    return socket.gethostname()



futures=[]
workers=(client.scheduler_info()['workers'])
print("workers details")


start=time.time()

for i in workers.keys():
    l=(workers[i]['nthreads'])
    l2=(workers[i]['metrics']['cpu'])
    print("worker :"+i)
    print('number of cpu: ')
    print(l2)
    print('number of threads:')
    print(l)
    print("##########")
    for j in range(int((l*(l2+10)))):
        future=client.submit(gethost)
        futures.append(future)

results=[future.result() for future in futures]

end=time.time()
print("results:")
print(results)
print("time: ")
print(end-start)

这是我得到的输出:


distributed.deploy.ssh - INFO - distributed.scheduler - INFO - -----------------------------------------------
distributed.deploy.ssh - INFO - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.deploy.ssh - INFO - /usr/local/lib/python3.6/site-packages/distributed/node.py:155: UserWarning: Port 8788 is already in use.
distributed.deploy.ssh - INFO - Perhaps you already have a cluster running?
distributed.deploy.ssh - INFO - Hosting the HTTP server on port 39850 instead
distributed.deploy.ssh - INFO - http_address["port"], self.http_server.port
distributed.deploy.ssh - INFO - distributed.scheduler - INFO - -----------------------------------------------
distributed.deploy.ssh - INFO - distributed.scheduler - INFO - Clear task state
distributed.deploy.ssh - INFO - distributed.scheduler - INFO -   Scheduler at:     tcp://10.20.3.1:41283
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.20.3.3:46321'
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.20.3.2:32907'
distributed.deploy.ssh - INFO - distributed.worker - INFO -       Start worker at:      tcp://10.20.3.3:35165
distributed.deploy.ssh - INFO - distributed.worker - INFO -       Start worker at:      tcp://10.20.3.2:33232
workers details
worker :tcp://10.20.3.2:33232
number of cpu:
0.0
number of threads:
12
##########
worker :tcp://10.20.3.3:35165
number of cpu:
0.0
number of threads:
12
##########
results:
['c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002']
time:
3.2259795665740967

如果您需要更多详细信息,请告诉我。 这也是我的第一个堆栈溢出问题,如果我做错了什么,请告诉我!

可能需要在工人上导入 socket

def gethost():
    sleep(2)
    import socket
    return socket.gethostname()

此外,client.submit 部分应指定由哪个 worker 执行函数:

for i in workers.keys():
   # code skipped
   future = client.submit(gethost, pure=False, workers=[i])

我目前无法尝试,但希望这对您有所帮助。