Dask Dataframe "ValueError: Data is compressed as snappy but we don't have this installed"

Dask Dataframe "ValueError: Data is compressed as snappy but we don't have this installed"

python-snappy 似乎已安装 - Dask returns 出现 ValueError。

jupyter 和 worker 的 Helm 配置:

env:
  - name: EXTRA_CONDA_PACKAGES
    value: numba xarray s3fs python-snappy pyarrow ruamel.yaml -c conda-forge
  - name: EXTRA_PIP_PACKAGES
    value: dask-ml --upgrade

容器显示 python-snappy(通过 conda 列表)

数据帧是从 Apache Drill 生成的多部分拼花文件加载的:

files = ['s3://{}'.format(f) for f in fs.glob(path='{}/*.parquet'.format(filename))]
df = dd.read_parquet(files)

运行 len(df) 在数据帧 returns 上:

distributed.utils - ERROR - Data is compressed as snappy but we don't have this installed
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/utils.py", line 622, in log_errors
    yield
  File "/opt/conda/lib/python3.6/site-packages/distributed/client.py", line 921, in _handle_report
    six.reraise(*clean_exception(**msg))
  File "/opt/conda/lib/python3.6/site-packages/six.py", line 692, in reraise
    raise value.with_traceback(tb)
  File "/opt/conda/lib/python3.6/site-packages/distributed/comm/tcp.py", line 203, in read
    msg = yield from_frames(frames, deserialize=self.deserialize)
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    return
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 315, in wrapper
    future.set_result(_value_from_stopiteration(e))
  File "/opt/conda/lib/python3.6/site-packages/distributed/comm/utils.py", line 75, in from_frames
    res = _from_frames()
  File "/opt/conda/lib/python3.6/site-packages/distributed/comm/utils.py", line 61, in _from_frames
    return protocol.loads(frames, deserialize=deserialize)
  File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/core.py", line 96, in loads
    msg = loads_msgpack(small_header, small_payload)
  File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/core.py", line 171, in loads_msgpack
    " installed" % str(header['compression']))
ValueError: Data is compressed as snappy but we don't have this installed

任何人都可以在这里建议正确的配置或补救步骤吗?

这个错误实际上不是来自读取您的 parquet 文件,而是来自 Dask 如何在机器之间压缩数据。您可以通过在所有 client/scheduler/worker pods 上一致地安装或不安装 python-snappy 来解决此问题。

您应该执行以下任一操作:

  1. jupyterworker pods 的 conda 软件包列表中删除 python-snappy。如果您使用 pyarrow 那么这是不必要的,我相信 Arrow 在 C++ 级别包含 snappy。
  2. python-snappy 添加到您的 scheduler 连播

FWIW 我个人推荐 lz4 用于机器间压缩而不是 snappy