使用 snappy 压缩时,对 Dask DataFrame 的操作失败

Operations on a Dask DataFrame fail when using snappy compression

我使用 pandas.DataFrame.to_parquet 将大型数据集划分为一系列 parquet 文件,并将它们保存到 S3。然后我使用 dask.read_parquet:

将这些读入集群上的 Dask
import dask.dataframe as dd
df = dd.read_parquet(
    's3://aleksey-emr-dask/data/2019-taxi-dataset/',
    storage_options={'key': 'secret', 'secret': 'secret'},
    engine='fastparquet'
)

pandas 默认使用 snappy 压缩。只要安装 python-snappysnappy 软件包,fastparquet 就可以使用此压缩。由于我在 AWS EMR 上 运行,并使用 Dask's EMR example bootstrap script,我使用 --botstrap-actions 标志和 --conda-packages 可选参数从 conda-forge 安装了这些软件包:

python3 -m pip list | grep snappy
python-snappy          0.5.4

这足以让dd.read_parquet成功。但是,某些操作会因 KeyError: snappy 而失败。例如,这失败了:

passenger_counts = df.trip_distance.value_counts().compute()

我知道这不是集群配置的错误,因为其他操作(如此操作)成功:

vendors = df.VendorID.value_counts().compute()
> 2.0    53516733
> 1.0    30368157
> 4.0      267080
> Name: VendorID, dtype: int64

这引出了我的问题。 Dask 是否不支持 snappy 压缩,即使其 IO 引擎(在本例中为 fastparquet)支持?

这是错误消息的全文:

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
<timed exec> in <module>

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
    165         dask.base.compute
    166         """
--> 167         (result,) = compute(self, traverse=False, **kwargs)
    168         return result
    169 

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    445         postcomputes.append(x.__dask_postcompute__())
    446 
--> 447     results = schedule(dsk, keys, **kwargs)
    448     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    449 

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2686                     should_rejoin = False
   2687             try:
-> 2688                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2689             finally:
   2690                 for f in futures.values():

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1986                 direct=direct,
   1987                 local_worker=local_worker,
-> 1988                 asynchronous=asynchronous,
   1989             )
   1990 

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    831         else:
    832             return sync(
--> 833                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    834             )
    835 

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    337     if error[0]:
    338         typ, exc, tb = error[0]
--> 339         raise exc.with_traceback(tb)
    340     else:
    341         return result[0]

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/utils.py in f()
    321             if callback_timeout is not None:
    322                 future = asyncio.wait_for(future, callback_timeout)
--> 323             result[0] = yield future
    324         except Exception as exc:
    325             error[0] = sys.exc_info()

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1874                 else:
   1875                     self._gather_future = future
-> 1876                 response = await future
   1877 
   1878             if response["status"] == "error":

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/client.py in _gather_remote(self, direct, local_worker)
   1925 
   1926             else:  # ask scheduler to gather data for us
-> 1927                 response = await retry_operation(self.scheduler.gather, keys=keys)
   1928 
   1929         return response

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/utils_comm.py in retry_operation(coro, operation, *args, **kwargs)
    388         delay_min=retry_delay_min,
    389         delay_max=retry_delay_max,
--> 390         operation=operation,
    391     )

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/utils_comm.py in retry(coro, count, delay_min, delay_max, jitter_fraction, retry_on_exceptions, operation)
    368                 delay *= 1 + random.random() * jitter_fraction
    369             await asyncio.sleep(delay)
--> 370     return await coro()
    371 
    372 

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
    859             name, comm.name = comm.name, "ConnectionPool." + key
    860             try:
--> 861                 result = await send_recv(comm=comm, op=key, **kwargs)
    862             finally:
    863                 self.pool.reuse(self.addr, comm)

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
    642         await comm.write(msg, serializers=serializers, on_error="raise")
    643         if reply:
--> 644             response = await comm.read(deserializers=deserializers)
    645         else:
    646             response = None

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/comm/tcp.py in read(self, deserializers)
    204                     deserialize=self.deserialize,
    205                     deserializers=deserializers,
--> 206                     allow_offload=self.allow_offload,
    207                 )
    208             except EOFError:

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/comm/utils.py in from_frames(frames, deserialize, deserializers, allow_offload)
     85         res = await offload(_from_frames)
     86     else:
---> 87         res = _from_frames()
     88 
     89     return res

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/comm/utils.py in _from_frames()
     64         try:
     65             return protocol.loads(
---> 66                 frames, deserialize=deserialize, deserializers=deserializers
     67             )
     68         except EOFError:

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/protocol/core.py in loads(frames, deserialize, deserializers)
    126             if deserialize or key in bytestrings:
    127                 if "compression" in head:
--> 128                     fs = decompress(head, fs)
    129                 fs = merge_frames(head, fs)
    130                 value = _deserialize(head, fs, deserializers=deserializers)

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/protocol/compression.py in decompress(header, frames)
    214     return [
    215         compressions[c]["decompress"](frame)
--> 216         for c, frame in zip(header["compression"], frames)
    217     ]

~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/protocol/compression.py in <listcomp>(.0)
    214     return [
    215         compressions[c]["decompress"](frame)
--> 216         for c, frame in zip(header["compression"], frames)
    217     ]

KeyError: 'snappy'

您还需要在客户端环境中安装 snappypython-snappy,以便工作人员可以使用编解码器将源字节转换为数据。

我正在通过 SSH 端口转发从我机器上的本地 Jupyter 笔记本访问集群,并且没有在本地安装这些包。在我的本地环境中安装它们:

$ conda install -c conda-forge snappy python-snappy

问题已解决。