将数据分散到 dask 集群工作者:未知地址方案 'gateway'
Scattering data to dask cluster workers: unknown address scheme 'gateway'
我正在按照在 上找到的代码(“块然后分散”部分)进行操作,但在尝试将 pandas.DataFrame 分散给工作人员时出现了一个奇怪的错误。
如果重要的话,我正在使用 jupyter notebook。
我不确定这个错误是什么意思,它很神秘,所以非常感谢任何帮助。
from dask_gateway import Gateway
import dask.dataframe as dd
import dask
gateway = Gateway()
options = gateway.cluster_options()
cluster = gateway.new_cluster(cluster_options=options)
cluster.scale(10)
client = cluster.get_client()
X_train = ... # build pandas.DataFrame
x = dd.from_pandas(X_train, npartitions=10)
x = x.persist(get=dask.threaded.get) # chunk locally
futures = client.scatter(dict(x.dask)) # scatter chunks
x.dask = x
---------------------------------------------------------------------------
Exception Traceback (most recent call last)
/tmp/ipykernel_567/3586545525.py in <module>
1 x = dd.from_pandas(X_train, npartitions=10)
2 x = x.persist(get=dask.threaded.get) # chunk locally
----> 3 futures = client.scatter(dict(x.dask)) # scatter chunks
4 x.dask = x
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/client.py in scatter(self, data, workers, broadcast, direct, hash, timeout, asynchronous)
2182 else:
2183 local_worker = None
-> 2184 return self.sync(
2185 self._scatter,
2186 data,
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
866 return future
867 else:
--> 868 return sync(
869 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
870 )
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
330 if error[0]:
331 typ, exc, tb = error[0]
--> 332 raise exc.with_traceback(tb)
333 else:
334 return result[0]
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/utils.py in f()
313 if callback_timeout is not None:
314 future = asyncio.wait_for(future, callback_timeout)
--> 315 result[0] = yield future
316 except Exception:
317 error[0] = sys.exc_info()
/srv/conda/envs/notebook/lib/python3.9/site-packages/tornado/gen.py in run(self)
760
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/client.py in _scatter(self, data, workers, broadcast, direct, local_worker, timeout, hash)
2004 isinstance(k, (bytes, str)) for k in data
2005 ):
-> 2006 d = await self._scatter(keymap(stringify, data), workers, broadcast)
2007 return {k: d[stringify(k)] for k in data}
2008
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/client.py in _scatter(self, data, workers, broadcast, direct, local_worker, timeout, hash)
2073 )
2074 else:
-> 2075 await self.scheduler.scatter(
2076 data=data2,
2077 workers=workers,
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
893 name, comm.name = comm.name, "ConnectionPool." + key
894 try:
--> 895 result = await send_recv(comm=comm, op=key, **kwargs)
896 finally:
897 self.pool.reuse(self.addr, comm)
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
686 if comm.deserialize:
687 typ, exc, tb = clean_exception(**response)
--> 688 raise exc.with_traceback(tb)
689 else:
690 raise Exception(response["exception_text"])
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/core.py in handle_comm()
528 result = asyncio.ensure_future(result)
529 self._ongoing_coroutines.add(result)
--> 530 result = await result
531 except (CommClosedError, CancelledError):
532 if self.status in (Status.running, Status.paused):
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/scheduler.py in scatter()
5795 assert isinstance(data, dict)
5796
-> 5797 keys, who_has, nbytes = await scatter_to_workers(
5798 nthreads, data, rpc=self.rpc, report=False
5799 )
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/utils_comm.py in scatter_to_workers()
143 rpcs = {addr: rpc(addr) for addr in d}
144 try:
--> 145 out = await All(
146 [
147 rpcs[address].update_data(
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/utils.py in All()
214 while not tasks.done():
215 try:
--> 216 result = await tasks.next()
217 except Exception:
218
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/core.py in send_recv_from_rpc()
893 name, comm.name = comm.name, "ConnectionPool." + key
894 try:
--> 895 result = await send_recv(comm=comm, op=key, **kwargs)
896 finally:
897 self.pool.reuse(self.addr, comm)
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/core.py in send_recv()
688 raise exc.with_traceback(tb)
689 else:
--> 690 raise Exception(response["exception_text"])
691 return response
692
Exception: ValueError("unknown address scheme 'gateway' (known schemes: ['inproc', 'tcp', 'tls', 'ucx', 'ws', 'wss'])")
dd.from_pandas()
在内部执行此“partitioning-then-scattering”,因此您不必再手动执行此操作。您可以直接在 x
上使用 Dask DataFrame API,计算应该会自动在您的集群上运行。 :)
是 5 年前的,现在已经过时了,因为 Dask 从那以后已经成熟了很多。例如,x.dask
现在指的是“高级图”(最近添加的功能)而不是 low-level 图。 Dask Gateway 使用它自己的 URL 方案,我猜它无法正确地与这个旧的 Dask 语法接口。
另外,请注意不再推荐混合调度程序(如该答案中所做的那样)。
我正在按照在
如果重要的话,我正在使用 jupyter notebook。
我不确定这个错误是什么意思,它很神秘,所以非常感谢任何帮助。
from dask_gateway import Gateway
import dask.dataframe as dd
import dask
gateway = Gateway()
options = gateway.cluster_options()
cluster = gateway.new_cluster(cluster_options=options)
cluster.scale(10)
client = cluster.get_client()
X_train = ... # build pandas.DataFrame
x = dd.from_pandas(X_train, npartitions=10)
x = x.persist(get=dask.threaded.get) # chunk locally
futures = client.scatter(dict(x.dask)) # scatter chunks
x.dask = x
---------------------------------------------------------------------------
Exception Traceback (most recent call last)
/tmp/ipykernel_567/3586545525.py in <module>
1 x = dd.from_pandas(X_train, npartitions=10)
2 x = x.persist(get=dask.threaded.get) # chunk locally
----> 3 futures = client.scatter(dict(x.dask)) # scatter chunks
4 x.dask = x
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/client.py in scatter(self, data, workers, broadcast, direct, hash, timeout, asynchronous)
2182 else:
2183 local_worker = None
-> 2184 return self.sync(
2185 self._scatter,
2186 data,
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
866 return future
867 else:
--> 868 return sync(
869 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
870 )
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
330 if error[0]:
331 typ, exc, tb = error[0]
--> 332 raise exc.with_traceback(tb)
333 else:
334 return result[0]
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/utils.py in f()
313 if callback_timeout is not None:
314 future = asyncio.wait_for(future, callback_timeout)
--> 315 result[0] = yield future
316 except Exception:
317 error[0] = sys.exc_info()
/srv/conda/envs/notebook/lib/python3.9/site-packages/tornado/gen.py in run(self)
760
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/client.py in _scatter(self, data, workers, broadcast, direct, local_worker, timeout, hash)
2004 isinstance(k, (bytes, str)) for k in data
2005 ):
-> 2006 d = await self._scatter(keymap(stringify, data), workers, broadcast)
2007 return {k: d[stringify(k)] for k in data}
2008
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/client.py in _scatter(self, data, workers, broadcast, direct, local_worker, timeout, hash)
2073 )
2074 else:
-> 2075 await self.scheduler.scatter(
2076 data=data2,
2077 workers=workers,
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
893 name, comm.name = comm.name, "ConnectionPool." + key
894 try:
--> 895 result = await send_recv(comm=comm, op=key, **kwargs)
896 finally:
897 self.pool.reuse(self.addr, comm)
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
686 if comm.deserialize:
687 typ, exc, tb = clean_exception(**response)
--> 688 raise exc.with_traceback(tb)
689 else:
690 raise Exception(response["exception_text"])
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/core.py in handle_comm()
528 result = asyncio.ensure_future(result)
529 self._ongoing_coroutines.add(result)
--> 530 result = await result
531 except (CommClosedError, CancelledError):
532 if self.status in (Status.running, Status.paused):
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/scheduler.py in scatter()
5795 assert isinstance(data, dict)
5796
-> 5797 keys, who_has, nbytes = await scatter_to_workers(
5798 nthreads, data, rpc=self.rpc, report=False
5799 )
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/utils_comm.py in scatter_to_workers()
143 rpcs = {addr: rpc(addr) for addr in d}
144 try:
--> 145 out = await All(
146 [
147 rpcs[address].update_data(
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/utils.py in All()
214 while not tasks.done():
215 try:
--> 216 result = await tasks.next()
217 except Exception:
218
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/core.py in send_recv_from_rpc()
893 name, comm.name = comm.name, "ConnectionPool." + key
894 try:
--> 895 result = await send_recv(comm=comm, op=key, **kwargs)
896 finally:
897 self.pool.reuse(self.addr, comm)
/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/core.py in send_recv()
688 raise exc.with_traceback(tb)
689 else:
--> 690 raise Exception(response["exception_text"])
691 return response
692
Exception: ValueError("unknown address scheme 'gateway' (known schemes: ['inproc', 'tcp', 'tls', 'ucx', 'ws', 'wss'])")
dd.from_pandas()
在内部执行此“partitioning-then-scattering”,因此您不必再手动执行此操作。您可以直接在 x
上使用 Dask DataFrame API,计算应该会自动在您的集群上运行。 :)
x.dask
现在指的是“高级图”(最近添加的功能)而不是 low-level 图。 Dask Gateway 使用它自己的 URL 方案,我猜它无法正确地与这个旧的 Dask 语法接口。
另外,请注意不再推荐混合调度程序(如该答案中所做的那样)。