Dask Distributed 似乎没有在 compute() 期间传递请求的操作

Dask Distributed appears to not be passing the requested operation during compute()

在下面的操作中(改编自 Dask DataFrame API 文档),如果我没有附加到调度程序(将分配客户端变量的行注释掉),操作将按预期成功完成.

from dask.distributed import Client
import dask.dataframe as dd
import pandas as pd

connection_loc = 'foobar.net:8786'
# client = Client(connection_loc)

df = pd.DataFrame({'x': [1, 2, 3, 4, 5], 'y': [1., 2., 3., 4., 5.]})
ddf = dd.from_pandas(df, npartitions=2)
foo = ddf.map_overlap(lambda df: df.rolling(2).sum(), 2, 0).compute()

取消注释同一行并分配客户端连接的那一刻,发生以下错误:TypeError: unorderable types: list() >= int()(有关更多信息,请参见完整的回溯)。

检查回溯,我可以看到它试图反序列化的字节串不是我期望的它应该尝试反序列化的字节串(参见完整回溯中的第一行 distributed.protocol.pickle - INFO - Failed to deserialize)。

我已经完全停止并重新启动了远程容器 运行 worker 和 scheduler 都无济于事。我也用过 client.restart() 没有运气。知道为什么将此其他任务传递给工作人员并抛出此错误吗?有什么办法可以让 Dask 停止这样做吗?

完整追溯:

dask_worker_1     | distributed.protocol.pickle - INFO - Failed to deserialize b"\x80\x04\x95+\x01\x00\x00\x00\x00\x00\x00(\x8c\x17cloudpickle.cloudpickle\x94\x8c\x0e_fill_function\x94\x93\x94(h\x00\x8c\x0f_make_skel_func\x94\x93\x94h\x00\x8c\r_builtin_type\x94\x93\x94\x8c\x08CodeType\x94\x85\x94R\x94(K\x01K\x00K\x01K\x02KCC\x0e|\x00j\x00d\x01\x83\x01j\x01\x83\x00S\x00\x94NK\x02\x86\x94\x8c\x07rolling\x94\x8c\x03sum\x94\x86\x94\x8c\x02df\x94\x85\x94\x8c\x1fdask_method/dask_dist_matrix.py\x94\x8c\x08<lambda>\x94K\rC\x00\x94))t\x94R\x94]\x94}\x94\x87\x94R\x94}\x94N}\x94tRN\x8c3('from_pandas-ddc065084280667dd51853b144bdd4e8', 0)\x94NK\x02K\x00)}\x94t\x94."
dask_worker_1     | Traceback (most recent call last):
dask_worker_1     |   File "/usr/local/lib/python3.5/site-packages/distributed/protocol/pickle.py", line 59, in loads
dask_worker_1     |     return pickle.loads(x)
dask_worker_1     |   File "/usr/local/lib/python3.5/site-packages/cloudpickle/cloudpickle.py", line 935, in _make_skel_func
dask_worker_1     |     if cell_count >= 0 else
dask_worker_1     | TypeError: unorderable types: list() >= int()
dask_worker_1     | distributed.worker - WARNING - Could not deserialize task
dask_worker_1     | Traceback (most recent call last):
dask_worker_1     |   File "/usr/local/lib/python3.5/site-packages/distributed/worker.py", line 1113, in add_task
dask_worker_1     |     self.tasks[key] = _deserialize(function, args, kwargs, task)
dask_worker_1     |   File "/usr/local/lib/python3.5/site-packages/distributed/worker.py", line 573, in _deserialize
dask_worker_1     |     args = pickle.loads(args)
dask_worker_1     |   File "/usr/local/lib/python3.5/site-packages/distributed/protocol/pickle.py", line 59, in loads
dask_worker_1     |     return pickle.loads(x)
dask_worker_1     |   File "/usr/local/lib/python3.5/site-packages/cloudpickle/cloudpickle.py", line 935, in _make_skel_func
dask_worker_1     |     if cell_count >= 0 else
dask_worker_1     | TypeError: unorderable types: list() >= int()

达斯克:0.15.0 分布式:1.17.1 OS:Ubuntu 16.04.2 LTS

我怀疑您的工作人员和客户之间的 cloudpickle 版本不匹配。您必须确保您的所有员工和客户都具有相同的软件设置。您可以尝试以下命令来提供帮助:

client.get_versions(check=True)

我认为这在 dask.distributed 1.17.1 版中不包括 cloudpickle,但在所有后续版本中都应该包括。 (现在可以在 master 中使用了)

正如其他答案所提到的,这几乎可以肯定是软件版本不匹配。我遇到了同样的问题。

我做了几件事来让它重新工作。我使用的是 dask_ec2,因此我将在此处包含这些更改,但我不知道您是如何设置集群的。

首先,当我在本地使用 ubuntu 16.04 时,我想如果分布式服务器具有相同的版本,它更有可能具有相同的库等,但这有一个问题(参见 https://github.com/dask/dask-ec2/issues/98)。总结:我修改了dask_ec2/salt.py,在__install_salt_rest_api方法中设置为下载cherrypy==3.2.3(详见链接问题)

其次,我将 dask_ec2 设置为使用较新版本的 Anaconda。在 dask_ec2/formulas/salt/conda/settings.sls,将 download_url 行更改为:

{% set download_url = 'https://repo.continuum.io/archive/Anaconda2-5.0.1-Linux-x86_64.sh' %}

{% set download_url = 'https://repo.continuum.io/archive/Anaconda3-5.0.1-Linux-x86_64.sh' %}

第三,我 运行 在我自己的计算机上进行更新,以确保我自己的库是最新的:

例如来自:Upgrading all packages with pip

pip freeze --local | grep -v '^\-e' | cut -d = -f 1  | xargs -n1 pip install -U

conda update --all

我终于重新启动了全部,一切正常。