如何将单个镶木地板文件从 s3 读取到 dask 数据帧中?
How to read a single parquet file from s3 into a dask dataframe?
我正在尝试将单个镶木地板文件从 s3 快速压缩读取到 Dask Dataframe 中。没有元数据目录,因为这个文件是使用 Spark 2.1
编写的
它不能在本地使用 fastparquet
import dask.dataframe as dd
dd.read_parquet('test.snappy.parquet', engine='fastparquet')
我得到这些异常:
NotADirectoryError Traceback (most recent call last)
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, sep, root)
95 self.fn = fn2
---> 96 with open_with(fn2, 'rb') as f:
97 self._parse_header(f, verify)
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/dask/bytes/core.py in __enter__(self)
311 mode = self.mode.replace('t', '').replace('b', '') + 'b'
--> 312 f = f2 = self.myopen(self.path, mode=mode)
313 CompressFile = merge(seekable_files, compress_files)[self.compression]
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/dask/bytes/local.py in open(self, path, mode, **kwargs)
60 path = self._trim_filename(path)
---> 61 return open(path, mode=mode)
62
NotADirectoryError: [Errno 20] Not a directory: '/home/arinarmo/test.snappy.parquet/_metadata'
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/fastparquet/api.py in _parse_header(self, f, verify)
118 try:
--> 119 fmd = read_thrift(f, parquet_thrift.FileMetaData)
120 except Exception:
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/fastparquet/thrift_structures.py in read_thrift(file_obj, ttype)
21 obj = ttype()
---> 22 obj.read(pin)
23
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/fastparquet/parquet_thrift/parquet/ttypes.py in read(self, iprot)
1864 if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None:
-> 1865 iprot._fast_decode(self, iprot, (self.__class__, self.thrift_spec))
1866 return
TypeError: expecting list of size 2 for struct args
During handling of the above exception, another exception occurred:
ParquetException Traceback (most recent call last)
<ipython-input-21-0dc755d9917b> in <module>()
----> 1 dd.read_parquet('test.snappy.parquet', engine='fastparquet')
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/dask/dataframe/io/parquet.py in read_parquet(path, columns, filters, categories, index, storage_options, engine)
763
764 return read(fs, paths, file_opener, columns=columns, filters=filters,
--> 765 categories=categories, index=index)
766
767
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/dask/dataframe/io/parquet.py in _read_fastparquet(fs, paths, myopen, columns, filters, categories, index, storage_options)
209 sep=fs.sep)
210 except Exception:
--> 211 pf = fastparquet.ParquetFile(paths[0], open_with=myopen, sep=fs.sep)
212
213 check_column_names(pf.columns, categories)
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, sep, root)
100 self.fn = fn
101 with open_with(fn, 'rb') as f:
--> 102 self._parse_header(f, verify)
103 self.open = open_with
104
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/fastparquet/api.py in _parse_header(self, f, verify)
120 except Exception:
121 raise ParquetException('Metadata parse failed: %s' %
--> 122 self.fn)
123 self.head_size = head_size
124 self.fmd = fmd
ParquetException: Metadata parse failed: test.snappy.parquet
它适用于本地镶木地板文件和 pyarrow:
dd.read_parquet('test.snappy.parquet', engine='pyarrow')
最后,尝试使用 S3 和 pyarrow 也失败了:
dd.read_parquet('s3://redacted-location/test.snappy.parquet', engine='pyarrow')
有以下例外:
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/dask/dataframe/io/parquet.py in read_parquet(path, columns, filters, categories, index, storage_options, engine)
763
764 return read(fs, paths, file_opener, columns=columns, filters=filters,
--> 765 categories=categories, index=index)
766
767
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/dask/dataframe/io/parquet.py in _read_pyarrow(fs, paths, file_opener, columns, filters, categories, index)
492 columns = list(columns)
493
--> 494 dataset = pq.ParquetDataset(paths, filesystem=get_pyarrow_filesystem(fs))
495 schema = dataset.schema.to_arrow_schema()
496 has_pandas_metadata = schema.metadata is not None and b'pandas' in schema.metadata
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/pyarrow/parquet.py in __init__(self, path_or_paths, filesystem, schema, metadata, split_row_groups, validate_schema)
703
704 if validate_schema:
--> 705 self.validate_schemas()
706
707 def validate_schemas(self):
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/pyarrow/parquet.py in validate_schemas(self)
712 self.schema = open_file(self.metadata_path).schema
713 else:
--> 714 self.schema = self.pieces[0].get_metadata(open_file).schema
715 elif self.schema is None:
716 self.schema = self.metadata.schema
IndexError: list index out of range
在 this 问题中建议使用 fastparquet.writer.merge
,因为它应该写入元数据目录,但它对我来说失败并出现与之前相同的错误
fastparquet给出的错误具有误导性:它首先尝试加载一个目录,失败了,然后直接将路径加载为文件。真正的错误在于 thrift 元数据的解码。由于 this commit 您可能会发现现在解析文件确实有效。
我正在尝试将单个镶木地板文件从 s3 快速压缩读取到 Dask Dataframe 中。没有元数据目录,因为这个文件是使用 Spark 2.1
编写的它不能在本地使用 fastparquet
import dask.dataframe as dd
dd.read_parquet('test.snappy.parquet', engine='fastparquet')
我得到这些异常:
NotADirectoryError Traceback (most recent call last)
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, sep, root)
95 self.fn = fn2
---> 96 with open_with(fn2, 'rb') as f:
97 self._parse_header(f, verify)
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/dask/bytes/core.py in __enter__(self)
311 mode = self.mode.replace('t', '').replace('b', '') + 'b'
--> 312 f = f2 = self.myopen(self.path, mode=mode)
313 CompressFile = merge(seekable_files, compress_files)[self.compression]
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/dask/bytes/local.py in open(self, path, mode, **kwargs)
60 path = self._trim_filename(path)
---> 61 return open(path, mode=mode)
62
NotADirectoryError: [Errno 20] Not a directory: '/home/arinarmo/test.snappy.parquet/_metadata'
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/fastparquet/api.py in _parse_header(self, f, verify)
118 try:
--> 119 fmd = read_thrift(f, parquet_thrift.FileMetaData)
120 except Exception:
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/fastparquet/thrift_structures.py in read_thrift(file_obj, ttype)
21 obj = ttype()
---> 22 obj.read(pin)
23
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/fastparquet/parquet_thrift/parquet/ttypes.py in read(self, iprot)
1864 if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None:
-> 1865 iprot._fast_decode(self, iprot, (self.__class__, self.thrift_spec))
1866 return
TypeError: expecting list of size 2 for struct args
During handling of the above exception, another exception occurred:
ParquetException Traceback (most recent call last)
<ipython-input-21-0dc755d9917b> in <module>()
----> 1 dd.read_parquet('test.snappy.parquet', engine='fastparquet')
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/dask/dataframe/io/parquet.py in read_parquet(path, columns, filters, categories, index, storage_options, engine)
763
764 return read(fs, paths, file_opener, columns=columns, filters=filters,
--> 765 categories=categories, index=index)
766
767
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/dask/dataframe/io/parquet.py in _read_fastparquet(fs, paths, myopen, columns, filters, categories, index, storage_options)
209 sep=fs.sep)
210 except Exception:
--> 211 pf = fastparquet.ParquetFile(paths[0], open_with=myopen, sep=fs.sep)
212
213 check_column_names(pf.columns, categories)
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, sep, root)
100 self.fn = fn
101 with open_with(fn, 'rb') as f:
--> 102 self._parse_header(f, verify)
103 self.open = open_with
104
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/fastparquet/api.py in _parse_header(self, f, verify)
120 except Exception:
121 raise ParquetException('Metadata parse failed: %s' %
--> 122 self.fn)
123 self.head_size = head_size
124 self.fmd = fmd
ParquetException: Metadata parse failed: test.snappy.parquet
它适用于本地镶木地板文件和 pyarrow:
dd.read_parquet('test.snappy.parquet', engine='pyarrow')
最后,尝试使用 S3 和 pyarrow 也失败了:
dd.read_parquet('s3://redacted-location/test.snappy.parquet', engine='pyarrow')
有以下例外:
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/dask/dataframe/io/parquet.py in read_parquet(path, columns, filters, categories, index, storage_options, engine)
763
764 return read(fs, paths, file_opener, columns=columns, filters=filters,
--> 765 categories=categories, index=index)
766
767
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/dask/dataframe/io/parquet.py in _read_pyarrow(fs, paths, file_opener, columns, filters, categories, index)
492 columns = list(columns)
493
--> 494 dataset = pq.ParquetDataset(paths, filesystem=get_pyarrow_filesystem(fs))
495 schema = dataset.schema.to_arrow_schema()
496 has_pandas_metadata = schema.metadata is not None and b'pandas' in schema.metadata
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/pyarrow/parquet.py in __init__(self, path_or_paths, filesystem, schema, metadata, split_row_groups, validate_schema)
703
704 if validate_schema:
--> 705 self.validate_schemas()
706
707 def validate_schemas(self):
~/.pyenv/versions/3.5.4/envs/hexapodask/lib/python3.5/site-packages/pyarrow/parquet.py in validate_schemas(self)
712 self.schema = open_file(self.metadata_path).schema
713 else:
--> 714 self.schema = self.pieces[0].get_metadata(open_file).schema
715 elif self.schema is None:
716 self.schema = self.metadata.schema
IndexError: list index out of range
在 this 问题中建议使用 fastparquet.writer.merge
,因为它应该写入元数据目录,但它对我来说失败并出现与之前相同的错误
fastparquet给出的错误具有误导性:它首先尝试加载一个目录,失败了,然后直接将路径加载为文件。真正的错误在于 thrift 元数据的解码。由于 this commit 您可能会发现现在解析文件确实有效。