读取镶木地板时,dask-yarn 作业因 dumps_msgpack ImportError 失败

dask-yarn job fails with dumps_msgpack ImportError while reading parquet

我正在尝试在具有一个主节点和一个工作节点的 AWS EMR 集群上使用 dask-yarn 对一个小型镶木地板文件(10K 条记录)进行简单的读取和计数,两者都是 m5.xlarge实例。

我正在尝试执行以下代码来测试我的集群:

import os
os.environ['ARROW_LIBHDFS_DIR'] = '/usr/lib/hadoop/lib/native/'

from dask_yarn import YarnCluster
from dask.distributed import Client
import dask.dataframe as dd

cluster = YarnCluster(environment='conf/conda_envs/dask_yarn.tar.gz',
                      worker_vcores=1,
                      worker_memory="2GiB")

cluster.scale(2)

client = Client(cluster)

# path = 's3://bucket-name/samples/data_10K/*'
path = 'hdfs:///samples/data_10K/*'

df = dd.read_parquet(path, engine='pyarrow', columns=['YEAR', 'MONTH', 'DAY_OF_MONTH', 'FL_DATE', 'DEP_TIME', 'ARR_TIME', 'ORIGIN', 'DEST', 'ACTUAL_ELAPSED_TIME'])

print(df.count().compute())

client.shutdown()
cluster.shutdown()

但是我遇到了这个异常:

Traceback (most recent call last):
  File "dask_test.py", line 30, in <module>
    print(df.count().compute())
  File "/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/dask/base.py", line 284, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/dask/base.py", line 566, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/distributed/client.py", line 2646, in get
    futures = self._graph_to_futures(
  File "/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/distributed/client.py", line 2554, in _graph_to_futures
    dsk = dsk.__dask_distributed_pack__(self, keyset)
  File "/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/dask/highlevelgraph.py", line 946, in __dask_distributed_pack__
    from distributed.protocol.core import dumps_msgpack
ImportError: cannot import name 'dumps_msgpack' from 'distributed.protocol.core' (/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/distributed/protocol/core.py)
Exception ignored in: <function YarnCluster.__del__ at 0x7f6584a2ac10>

s3hdfs 读取时出现此异常。

我目前的conda环境如下:

# packages in environment at /home/hadoop/miniconda3/envs/dask_yarn:
#
# Name                    Version                   Build  Channel
_libgcc_mutex             0.1                 conda_forge    conda-forge
_openmp_mutex             4.5                       1_gnu    conda-forge
aiobotocore               1.3.0              pyhd8ed1ab_0    conda-forge
aiohttp                   3.7.4            py38h497a2fe_0    conda-forge
aioitertools              0.7.1              pyhd8ed1ab_0    conda-forge
async-timeout             3.0.1                   py_1000    conda-forge
attrs                     20.3.0             pyhd3deb0d_0    conda-forge
blas                      1.0                    openblas    anaconda
bokeh                     2.2.3                    py38_0    anaconda
boost-cpp                 1.74.0               hc6e9bd1_2    conda-forge
botocore                  1.20.49            pyhd8ed1ab_0    conda-forge
brotlipy                  0.7.0           py38h497a2fe_1001    conda-forge
bzip2                     1.0.8                h7f98852_4    conda-forge
c-ares                    1.17.1               h7f98852_1    conda-forge
ca-certificates           2020.12.5            ha878542_0    conda-forge
certifi                   2020.12.5        py38h578d9bd_1    conda-forge
cffi                      1.14.5           py38ha65f79e_0    conda-forge
chardet                   4.0.0            py38h578d9bd_1    conda-forge
click                     7.1.2              pyh9f0ad1d_0    conda-forge
cloudpickle               1.6.0                      py_0    conda-forge
conda-pack                0.6.0              pyhd3deb0d_0    conda-forge
cryptography              3.4.7            py38ha5dfef3_0    conda-forge
curl                      7.76.1               h979ede3_1    conda-forge
cytoolz                   0.11.0           py38h497a2fe_3    conda-forge
dask                      2021.4.0           pyhd3eb1b0_0  
dask-core                 2021.4.0           pyhd3eb1b0_0  
dask-yarn                 0.9              py38h578d9bd_0    conda-forge
distributed               2021.4.1         py38h578d9bd_0    conda-forge
freetype                  2.10.4               h5ab3b9f_0    anaconda
fsspec                    2021.4.0           pyhd8ed1ab_0    conda-forge
gettext                   0.19.8.1          h0b5b191_1005    conda-forge
greenlet                  1.0.0            py38h709712a_0    conda-forge
grpcio                    1.37.0           py38hdd6454d_0    conda-forge
heapdict                  1.0.1                      py_0    conda-forge
icu                       68.1                 h58526e2_0    conda-forge
idna                      3.1                pyhd3deb0d_0    conda-forge
jinja2                    2.11.2                     py_0    anaconda
jmespath                  0.10.0             pyh9f0ad1d_0    conda-forge
jpeg                      9b                   habf39ab_1    anaconda
krb5                      1.17.2               h926e7f8_0    conda-forge
lcms2                     2.11                 h396b838_0    anaconda
ld_impl_linux-64          2.35.1               hea4e1c9_2    conda-forge
libcurl                   7.76.1               hc4aaa36_1    conda-forge
libedit                   3.1.20191231         he28a2e2_2    conda-forge
libev                     4.33                 h516909a_1    conda-forge
libffi                    3.3                  h58526e2_2    conda-forge
libgcc-ng                 9.3.0               h2828fa1_19    conda-forge
libgcrypt                 1.9.3                h7f98852_0    conda-forge
libgfortran-ng            7.3.0                hdf63c60_0    anaconda
libgomp                   9.3.0               h2828fa1_19    conda-forge
libgpg-error              1.42                 h9c3ff4c_0    conda-forge
libgsasl                  1.8.0                         2    conda-forge
libhdfs3                  2.3               hb485604_1015    conda-forge
libiconv                  1.16                 h516909a_0    conda-forge
libnghttp2                1.43.0               h812cca2_0    conda-forge
libntlm                   1.4               h7f98852_1002    conda-forge
libopenblas               0.3.10               h5a2b251_0    anaconda
libpng                    1.6.37               hbc83047_0    anaconda
libprotobuf               3.15.8               h780b84a_0    conda-forge
libssh2                   1.9.0                ha56f1ee_6    conda-forge
libstdcxx-ng              9.3.0               h6de172a_19    conda-forge
libtiff                   4.1.0                h2733197_1  
libuuid                   2.32.1            h7f98852_1000    conda-forge
libxml2                   2.9.10               h72842e0_4    conda-forge
locket                    0.2.0                      py_2    conda-forge
lz4-c                     1.9.3                h9c3ff4c_0    conda-forge
markupsafe                1.1.1            py38h7b6447c_0    anaconda
msgpack-python            1.0.2            py38h1fd1430_1    conda-forge
multidict                 5.1.0            py38h497a2fe_1    conda-forge
ncurses                   6.2                  h58526e2_4    conda-forge
numpy                     1.19.1           py38h30dfecb_0    anaconda
numpy-base                1.19.1           py38h75fe3a5_0    anaconda
olefile                   0.46                       py_0    anaconda
openssl                   1.1.1k               h7f98852_0    conda-forge
packaging                 20.4                       py_0    anaconda
pandas                    1.1.3            py38he6710b0_0    anaconda
partd                     1.2.0              pyhd8ed1ab_0    conda-forge
pillow                    8.0.0            py38h9a89aac_0    anaconda
pip                       21.1               pyhd8ed1ab_0    conda-forge
protobuf                  3.15.8           py38h709712a_0    conda-forge
psutil                    5.8.0            py38h497a2fe_1    conda-forge
pyarrow                   4.0.0                    pypi_0    pypi
pycparser                 2.20               pyh9f0ad1d_2    conda-forge
pyopenssl                 20.0.1             pyhd8ed1ab_0    conda-forge
pyparsing                 2.4.7                      py_0    anaconda
pysocks                   1.7.1            py38h578d9bd_3    conda-forge
python                    3.8.8           hffdb5ce_0_cpython    conda-forge
python-dateutil           2.8.1                      py_0    anaconda
python_abi                3.8                      1_cp38    conda-forge
pytz                      2020.1                     py_0    anaconda
pyyaml                    5.4.1            py38h497a2fe_0    conda-forge
readline                  8.1                  h46c0cb4_0    conda-forge
s3fs                      2021.4.0           pyhd8ed1ab_0    conda-forge
setuptools                49.6.0           py38h578d9bd_3    conda-forge
six                       1.15.0             pyh9f0ad1d_0    conda-forge
skein                     0.8.1            py38h578d9bd_1    conda-forge
sortedcontainers          2.3.0              pyhd8ed1ab_0    conda-forge
sqlalchemy                1.4.11           py38h497a2fe_0    conda-forge
sqlite                    3.35.5               h74cdb3f_0    conda-forge
tblib                     1.7.0              pyhd8ed1ab_0    conda-forge
tk                        8.6.10               h21135ba_1    conda-forge
toolz                     0.11.1                     py_0    conda-forge
tornado                   6.1              py38h497a2fe_1    conda-forge
typing-extensions         3.7.4.3                       0    conda-forge
typing_extensions         3.7.4.3                    py_0    anaconda
urllib3                   1.26.4             pyhd8ed1ab_0    conda-forge
wheel                     0.36.2             pyhd3deb0d_0    conda-forge
wrapt                     1.12.1           py38h497a2fe_3    conda-forge
xz                        5.2.5                h516909a_1    conda-forge
yaml                      0.2.5                h516909a_0    conda-forge
yarl                      1.6.3            py38h497a2fe_1    conda-forge
zict                      2.0.0                      py_0    conda-forge
zlib                      1.2.11            h516909a_1010    conda-forge
zstd                      1.4.9                ha95c52a_0    conda-forge

我必须安装 pyarrowpip3,否则我会得到另一个异常,不允许我从 hdfs 或 s3 读取。

纱线日志如下:

21/04/28 23:28:31 INFO client.RMProxy: Connecting to ResourceManager at ip-XXXXXXXX.us-west-1.compute.internal/XXXXXXXXXX:8032
21/04/28 23:28:31 INFO client.AHSProxy: Connecting to Application History server at ip-XXXXXXXX.us-west-1.compute.internal/XXXXXXXXXX:10200
Container: container_1619645048753_0020_01_000002 on ip-xxxxxxxxx.us-west-1.compute.internal_8041
LogAggregationType: AGGREGATED
===================================================================================================
LogType:dask.scheduler.log
LogLastModifiedTime:Wed Apr 28 23:28:10 +0000 2021
LogLength:787
LogContents:
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:   tcp://XXXXXXXXXXXX:32843
distributed.scheduler - INFO -   dashboard at:                    :34205
distributed.scheduler - INFO - Receive client connection: Client-6371f976-a879-11eb-aff0-063a3c27c63d
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Remove client Client-6371f976-a879-11eb-aff0-063a3c27c63d
distributed.scheduler - INFO - Remove client Client-6371f976-a879-11eb-aff0-063a3c27c63d
distributed.scheduler - INFO - Close client connection: Client-6371f976-a879-11eb-aff0-063a3c27c63d

End of LogType:dask.scheduler.log
***********************************************************************************


End of LogType:prelaunch.err
******************************************************************************

Container: container_1619645048753_0020_01_000002 on ip-xxxxxxxxx.us-west-1.compute.internal_8041
LogAggregationType: AGGREGATED
===================================================================================================
LogType:prelaunch.out
LogLastModifiedTime:Wed Apr 28 23:28:10 +0000 2021
LogLength:70
LogContents:
Setting up env variables
Setting up job resources
Launching container

End of LogType:prelaunch.out
******************************************************************************

Container: container_1619645048753_0020_01_000001 on ip-xxxxxxxxx.us-west-1.compute.internal_8041
LogAggregationType: AGGREGATED
===================================================================================================
LogType:application.master.log
LogLastModifiedTime:Wed Apr 28 23:28:10 +0000 2021
LogLength:2700
LogContents:
21/04/28 23:27:54 INFO skein.ApplicationMaster: Starting Skein version 0.8.1
21/04/28 23:27:54 INFO skein.ApplicationMaster: Running as user hadoop
21/04/28 23:27:54 INFO conf.Configuration: resource-types.xml not found
21/04/28 23:27:54 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
21/04/28 23:27:54 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE
21/04/28 23:27:54 INFO resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE
21/04/28 23:27:54 INFO skein.ApplicationMaster: Application specification successfully loaded
21/04/28 23:27:55 INFO client.RMProxy: Connecting to ResourceManager at ip-XXXXXXXX.us-west-1.compute.internal/XXXXXXXXXX:8030
21/04/28 23:27:55 INFO skein.ApplicationMaster: gRPC server started at ip-xxxxxxxxx.us-west-1.compute.internal:46405
21/04/28 23:27:56 INFO skein.ApplicationMaster: WebUI server started at ip-xxxxxxxxx.us-west-1.compute.internal:46231
21/04/28 23:27:56 INFO skein.ApplicationMaster: Registering application with resource manager
21/04/28 23:27:56 INFO client.RMProxy: Connecting to ResourceManager at ip-XXXXXXXX.us-west-1.compute.internal/XXXXXXXXXX:8032
21/04/28 23:27:56 INFO client.AHSProxy: Connecting to Application History server at ip-XXXXXXXXX.us-west-1.compute.internal/XXXXXXXXXX:10200
21/04/28 23:27:56 INFO skein.ApplicationMaster: Initializing service 'dask.worker'.
21/04/28 23:27:56 INFO skein.ApplicationMaster: Initializing service 'dask.scheduler'.
21/04/28 23:27:56 INFO skein.ApplicationMaster: REQUESTED: dask.scheduler_0
21/04/28 23:27:57 INFO skein.ApplicationMaster: Starting container_1619645048753_0020_01_000002...
21/04/28 23:27:57 INFO skein.ApplicationMaster: RUNNING: dask.scheduler_0 on container_1619645048753_0020_01_000002
21/04/28 23:28:07 INFO skein.ApplicationMaster: Scaling service 'dask.worker' to 2 instances, a delta of 2.
21/04/28 23:28:07 INFO skein.ApplicationMaster: REQUESTED: dask.worker_0
21/04/28 23:28:07 INFO skein.ApplicationMaster: REQUESTED: dask.worker_1
21/04/28 23:28:09 INFO skein.ApplicationMaster: Shutting down: Shutdown requested by user.
21/04/28 23:28:09 INFO skein.ApplicationMaster: Unregistering application with status SUCCEEDED
21/04/28 23:28:09 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered.
21/04/28 23:28:09 INFO skein.ApplicationMaster: Deleted application directory hdfs://ip-XXXXXXXXX.us-west-1.compute.internal:8020/user/hadoop/.skein/application_1619645048753_0020
21/04/28 23:28:09 INFO skein.ApplicationMaster: WebUI server shut down
21/04/28 23:28:09 INFO skein.ApplicationMaster: gRPC server shut down

End of LogType:application.master.log
***************************************************************************************


End of LogType:prelaunch.err
******************************************************************************

Container: container_1619645048753_0020_01_000001 on ip-xxxxxxxxx.us-west-1.compute.internal_8041
LogAggregationType: AGGREGATED
===================================================================================================
LogType:prelaunch.out
LogLastModifiedTime:Wed Apr 28 23:28:10 +0000 2021
LogLength:70
LogContents:
Setting up env variables
Setting up job resources
Launching container

End of LogType:prelaunch.out
******************************************************************************

有人知道这个问题的解决方法吗?

谢谢!

您的 dask 和分布式版本不同步,2021.4.0 与 2021.4.1。更新 dask 应该可以解决这个问题。请注意,您需要确保在您用于 YARN 的环境中也有完全相同的版本。

我通过安装以前版本的 distributed 而不是最新版本解决了这个问题。

dask 更新日志 (https://distributed.dask.org/en/latest/changelog.html) 显示 dumps_msgpack 已在分布式 2021.4.1 中删除,因此降级到 2021.4.0 就成功了。

但是,从环境中删除包会导致更多问题,因此我建议您从一开始就使用此命令安装分布式版本,以确保使用 risght 分布式版本:

conda install -c conda-forge distributed=2021.4.0 dask-yarn