使用 snappy 压缩时,对 Dask DataFrame 的操作失败
Operations on a Dask DataFrame fail when using snappy compression
我使用 pandas.DataFrame.to_parquet
将大型数据集划分为一系列 parquet
文件,并将它们保存到 S3。然后我使用 dask.read_parquet
:
将这些读入集群上的 Dask
import dask.dataframe as dd
df = dd.read_parquet(
's3://aleksey-emr-dask/data/2019-taxi-dataset/',
storage_options={'key': 'secret', 'secret': 'secret'},
engine='fastparquet'
)
pandas
默认使用 snappy
压缩。只要安装 python-snappy
和 snappy
软件包,fastparquet
就可以使用此压缩。由于我在 AWS EMR 上 运行,并使用 Dask's EMR example bootstrap script,我使用 --botstrap-actions
标志和 --conda-packages
可选参数从 conda-forge
安装了这些软件包:
python3 -m pip list | grep snappy
python-snappy 0.5.4
这足以让dd.read_parquet
成功。但是,某些操作会因 KeyError: snappy
而失败。例如,这失败了:
passenger_counts = df.trip_distance.value_counts().compute()
我知道这不是集群配置的错误,因为其他操作(如此操作)成功:
vendors = df.VendorID.value_counts().compute()
> 2.0 53516733
> 1.0 30368157
> 4.0 267080
> Name: VendorID, dtype: int64
这引出了我的问题。 Dask 是否不支持 snappy
压缩,即使其 IO 引擎(在本例中为 fastparquet
)支持?
这是错误消息的全文:
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
<timed exec> in <module>
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
165 dask.base.compute
166 """
--> 167 (result,) = compute(self, traverse=False, **kwargs)
168 return result
169
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
445 postcomputes.append(x.__dask_postcompute__())
446
--> 447 results = schedule(dsk, keys, **kwargs)
448 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
449
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2686 should_rejoin = False
2687 try:
-> 2688 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2689 finally:
2690 for f in futures.values():
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1986 direct=direct,
1987 local_worker=local_worker,
-> 1988 asynchronous=asynchronous,
1989 )
1990
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
831 else:
832 return sync(
--> 833 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
834 )
835
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
337 if error[0]:
338 typ, exc, tb = error[0]
--> 339 raise exc.with_traceback(tb)
340 else:
341 return result[0]
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/utils.py in f()
321 if callback_timeout is not None:
322 future = asyncio.wait_for(future, callback_timeout)
--> 323 result[0] = yield future
324 except Exception as exc:
325 error[0] = sys.exc_info()
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1874 else:
1875 self._gather_future = future
-> 1876 response = await future
1877
1878 if response["status"] == "error":
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/client.py in _gather_remote(self, direct, local_worker)
1925
1926 else: # ask scheduler to gather data for us
-> 1927 response = await retry_operation(self.scheduler.gather, keys=keys)
1928
1929 return response
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/utils_comm.py in retry_operation(coro, operation, *args, **kwargs)
388 delay_min=retry_delay_min,
389 delay_max=retry_delay_max,
--> 390 operation=operation,
391 )
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/utils_comm.py in retry(coro, count, delay_min, delay_max, jitter_fraction, retry_on_exceptions, operation)
368 delay *= 1 + random.random() * jitter_fraction
369 await asyncio.sleep(delay)
--> 370 return await coro()
371
372
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
859 name, comm.name = comm.name, "ConnectionPool." + key
860 try:
--> 861 result = await send_recv(comm=comm, op=key, **kwargs)
862 finally:
863 self.pool.reuse(self.addr, comm)
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
642 await comm.write(msg, serializers=serializers, on_error="raise")
643 if reply:
--> 644 response = await comm.read(deserializers=deserializers)
645 else:
646 response = None
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/comm/tcp.py in read(self, deserializers)
204 deserialize=self.deserialize,
205 deserializers=deserializers,
--> 206 allow_offload=self.allow_offload,
207 )
208 except EOFError:
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/comm/utils.py in from_frames(frames, deserialize, deserializers, allow_offload)
85 res = await offload(_from_frames)
86 else:
---> 87 res = _from_frames()
88
89 return res
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/comm/utils.py in _from_frames()
64 try:
65 return protocol.loads(
---> 66 frames, deserialize=deserialize, deserializers=deserializers
67 )
68 except EOFError:
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/protocol/core.py in loads(frames, deserialize, deserializers)
126 if deserialize or key in bytestrings:
127 if "compression" in head:
--> 128 fs = decompress(head, fs)
129 fs = merge_frames(head, fs)
130 value = _deserialize(head, fs, deserializers=deserializers)
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/protocol/compression.py in decompress(header, frames)
214 return [
215 compressions[c]["decompress"](frame)
--> 216 for c, frame in zip(header["compression"], frames)
217 ]
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/protocol/compression.py in <listcomp>(.0)
214 return [
215 compressions[c]["decompress"](frame)
--> 216 for c, frame in zip(header["compression"], frames)
217 ]
KeyError: 'snappy'
您还需要在客户端环境中安装 snappy
和 python-snappy
,以便工作人员可以使用编解码器将源字节转换为数据。
我正在通过 SSH 端口转发从我机器上的本地 Jupyter 笔记本访问集群,并且没有在本地安装这些包。在我的本地环境中安装它们:
$ conda install -c conda-forge snappy python-snappy
问题已解决。
我使用 pandas.DataFrame.to_parquet
将大型数据集划分为一系列 parquet
文件,并将它们保存到 S3。然后我使用 dask.read_parquet
:
import dask.dataframe as dd
df = dd.read_parquet(
's3://aleksey-emr-dask/data/2019-taxi-dataset/',
storage_options={'key': 'secret', 'secret': 'secret'},
engine='fastparquet'
)
pandas
默认使用 snappy
压缩。只要安装 python-snappy
和 snappy
软件包,fastparquet
就可以使用此压缩。由于我在 AWS EMR 上 运行,并使用 Dask's EMR example bootstrap script,我使用 --botstrap-actions
标志和 --conda-packages
可选参数从 conda-forge
安装了这些软件包:
python3 -m pip list | grep snappy
python-snappy 0.5.4
这足以让dd.read_parquet
成功。但是,某些操作会因 KeyError: snappy
而失败。例如,这失败了:
passenger_counts = df.trip_distance.value_counts().compute()
我知道这不是集群配置的错误,因为其他操作(如此操作)成功:
vendors = df.VendorID.value_counts().compute()
> 2.0 53516733
> 1.0 30368157
> 4.0 267080
> Name: VendorID, dtype: int64
这引出了我的问题。 Dask 是否不支持 snappy
压缩,即使其 IO 引擎(在本例中为 fastparquet
)支持?
这是错误消息的全文:
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
<timed exec> in <module>
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
165 dask.base.compute
166 """
--> 167 (result,) = compute(self, traverse=False, **kwargs)
168 return result
169
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
445 postcomputes.append(x.__dask_postcompute__())
446
--> 447 results = schedule(dsk, keys, **kwargs)
448 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
449
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2686 should_rejoin = False
2687 try:
-> 2688 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2689 finally:
2690 for f in futures.values():
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1986 direct=direct,
1987 local_worker=local_worker,
-> 1988 asynchronous=asynchronous,
1989 )
1990
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
831 else:
832 return sync(
--> 833 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
834 )
835
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
337 if error[0]:
338 typ, exc, tb = error[0]
--> 339 raise exc.with_traceback(tb)
340 else:
341 return result[0]
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/utils.py in f()
321 if callback_timeout is not None:
322 future = asyncio.wait_for(future, callback_timeout)
--> 323 result[0] = yield future
324 except Exception as exc:
325 error[0] = sys.exc_info()
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1874 else:
1875 self._gather_future = future
-> 1876 response = await future
1877
1878 if response["status"] == "error":
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/client.py in _gather_remote(self, direct, local_worker)
1925
1926 else: # ask scheduler to gather data for us
-> 1927 response = await retry_operation(self.scheduler.gather, keys=keys)
1928
1929 return response
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/utils_comm.py in retry_operation(coro, operation, *args, **kwargs)
388 delay_min=retry_delay_min,
389 delay_max=retry_delay_max,
--> 390 operation=operation,
391 )
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/utils_comm.py in retry(coro, count, delay_min, delay_max, jitter_fraction, retry_on_exceptions, operation)
368 delay *= 1 + random.random() * jitter_fraction
369 await asyncio.sleep(delay)
--> 370 return await coro()
371
372
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
859 name, comm.name = comm.name, "ConnectionPool." + key
860 try:
--> 861 result = await send_recv(comm=comm, op=key, **kwargs)
862 finally:
863 self.pool.reuse(self.addr, comm)
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
642 await comm.write(msg, serializers=serializers, on_error="raise")
643 if reply:
--> 644 response = await comm.read(deserializers=deserializers)
645 else:
646 response = None
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/comm/tcp.py in read(self, deserializers)
204 deserialize=self.deserialize,
205 deserializers=deserializers,
--> 206 allow_offload=self.allow_offload,
207 )
208 except EOFError:
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/comm/utils.py in from_frames(frames, deserialize, deserializers, allow_offload)
85 res = await offload(_from_frames)
86 else:
---> 87 res = _from_frames()
88
89 return res
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/comm/utils.py in _from_frames()
64 try:
65 return protocol.loads(
---> 66 frames, deserialize=deserialize, deserializers=deserializers
67 )
68 except EOFError:
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/protocol/core.py in loads(frames, deserialize, deserializers)
126 if deserialize or key in bytestrings:
127 if "compression" in head:
--> 128 fs = decompress(head, fs)
129 fs = merge_frames(head, fs)
130 value = _deserialize(head, fs, deserializers=deserializers)
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/protocol/compression.py in decompress(header, frames)
214 return [
215 compressions[c]["decompress"](frame)
--> 216 for c, frame in zip(header["compression"], frames)
217 ]
~/opt/miniconda3/envs/dask-local-test-env/lib/python3.7/site-packages/distributed/protocol/compression.py in <listcomp>(.0)
214 return [
215 compressions[c]["decompress"](frame)
--> 216 for c, frame in zip(header["compression"], frames)
217 ]
KeyError: 'snappy'
您还需要在客户端环境中安装 snappy
和 python-snappy
,以便工作人员可以使用编解码器将源字节转换为数据。
我正在通过 SSH 端口转发从我机器上的本地 Jupyter 笔记本访问集群,并且没有在本地安装这些包。在我的本地环境中安装它们:
$ conda install -c conda-forge snappy python-snappy
问题已解决。