了解有关 DataFrame 操作的 Dask 分布式行为

Understanding Dask Distributed behavior with regards to DataFrame operations

我想更好地了解 dask.distributed 的工作原理。我有一个简单的 csv,我读入了一个 Dask 数据框,如下所示。此操作执行得很好,returns 一些整数值表示数据帧的长度,这是我期望的行为。

import dask.dataframe as dd
gdf = dd.read_csv(filepath)
len(gdf)
# returns some int value

但是一旦我从 dask.distributed 引入 Client 的实例,我收到以下错误:

distributed.utils - ERROR - 'LocalFileSystem' object has no attribute 'cwd'

这是一个示例代码块:

from dask.distributed import Client
import dask.dataframe as dd
client_db = Client(remote_addr)
gdf = dd.read_csv(filepath)
len(gdf)
# throws the above error

我很困惑 - 一旦实例化,Client "inject itself" 就会进入所有 Dask 操作。我想我需要做类似 gdf = client_db.persist(gdf) 的事情来请求 Client 连接来管理该数据帧上的操作。

非常感谢了解这里发生的事情的背景信息!从回溯中我可以看出它与 Tornado 有关,Tornado 是 Py 中的一个 Web 框架,允许 Web 套接字、长轮询等。我假设它试图存储一些东西……某处……但我的熟悉程度在这里下车。

如果需要,回溯:

Traceback (most recent call last):
  File "/.../geopandas_opt/venv/lib/python3.6/site-packages/distributed/utils.py", line 223, in f
    result[0] = yield make_coro()
  File "/.../geopandas_opt/venv/lib/python3.6/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/.../geopandas_opt/venv/lib/python3.6/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/.../geopandas_opt/venv/lib/python3.6/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/.../geopandas_opt/venv/lib/python3.6/site-packages/distributed/client.py", line 1156, in _gather
    traceback)
  File "/.../geopandas_opt/venv/lib/python3.6/site-packages/six.py", line 685, in reraise
    raise value.with_traceback(tb)
  File "/usr/local/lib/python3.5/site-packages/dask/bytes/core.py", line 212, in read_block_from_file
  File "/usr/local/lib/python3.5/site-packages/dask/bytes/core.py", line 314, in __enter__
  File "/usr/local/lib/python3.5/site-packages/dask/bytes/local.py", line 64, in open
  File "/usr/local/lib/python3.5/site-packages/dask/bytes/local.py", line 36, in _trim_filename
AttributeError: 'LocalFileSystem' object has no attribute 'cwd'

是的,当您创建一个客户端时,它会将自己注册为默认的全局调度程序。您可以使用 set_as_default= 关键字

避免这种行为
client = Client(..., set_as_default=False)

关于您 运行 遇到的异常,我怀疑是版本不匹配。您可能希望使用 condapip.

进行升级
conda install dask distributed

pip install dask distributed