使用 Dask 从 google 云存储中读取镶木地板文件
Using Dask to read parquet files from a google cloud storage
我正在尝试使用 Dask 从 google 存储桶中读取和写入。
使用一堆 csv
文件可行,但不方便(速度较慢,无法压缩,不能只读取某些列)所以我尝试使用 apache parquet
格式。
写作似乎还不错:
import dask.dataframe as dd
pandas_df = pd.DataFrame({'x' : [2,3, 2], 'y': [1, 0, 0]})
dask_df = dd.from_pandas(pandas_df, npartitions=2)
dask_df.to_parquet("gcs://my_google_bucket/test/")
但是当我尝试读回它时
read_again_df = dd.read_parquet("gcs://my_google_bucket/test/")
我收到一个未实现的错误:
AttributeError Traceback (most recent call last)
~/miniconda3/envs/env1/lib/python3.6/site-packages/dask/bytes/core.py in get_pyarrow_filesystem(fs)
520 try:
--> 521 return fs._get_pyarrow_filesystem()
522 except AttributeError:
AttributeError: 'DaskGCSFileSystem' object has no attribute '_get_pyarrow_filesystem'
During handling of the above exception, another exception occurred:
NotImplementedError Traceback (most recent call last)
<ipython-input-42-ef1fc41d04d5> in <module>()
----> 1 read_again = dd.read_parquet("gcs://my_google_bucket/test/")
~/miniconda3/envs/env1/lib/python3.6/site-packages/dask/dataframe/io/parquet.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, infer_divisions)
991
992 return read(fs, fs_token, paths, columns=columns, filters=filters,
--> 993 categories=categories, index=index, infer_divisions=infer_divisions)
994
995
~/miniconda3/envs/env1/lib/python3.6/site-packages/dask/dataframe/io/parquet.py in _read_pyarrow(fs, fs_token, paths, columns, filters, categories, index, infer_divisions)
505 columns = list(columns)
506
--> 507 dataset = pq.ParquetDataset(paths, filesystem=get_pyarrow_filesystem(fs))
508 if dataset.partitions is not None:
509 partitions = [n for n in dataset.partitions.partition_names
~/miniconda3/envs/env1/lib/python3.6/site-packages/dask/bytes/core.py in get_pyarrow_filesystem(fs)
522 except AttributeError:
523 raise NotImplementedError("Using pyarrow with a %r "
--> 524 "filesystem object" % type(fs).__name__)
NotImplementedError: Using pyarrow with a 'DaskGCSFileSystem' filesystem object
我猜这意味着 dask
仍然无法直接从 google 云服务读取 parquet 文件。
有没有什么间接的方法可以使这项工作,比如说,使用 pyarrow
?
我想保留的是延迟加载东西然后使用dask
做数据转换的能力。
谢谢!
Dask 当然可以使用 fastparquet 后端从 GCS 读取 parquet (engine='fastparquet'
)。请注意,pyarrow 不会生成 fastparquet 期望的 _metadata
文件,因此您可以使用 fastparquet 写入数据,使用 fastparquet 从现有数据文件创建文件,或者传递一个指向所有数据文件而不是目录。
你正在做的事情也应该与 pyarrow 一起工作,因为 pyarrow 通常可以接受任何 python 类文件对象,但在这种情况下似乎试图制作一个 pyarrow 文件系统。您在上面看到的错误可能是错误,应该进行调查。
-编辑-
根据 OP 的评论,以下内容确实有效
pandas_df = pd.DataFrame({'x' : [2,3, 2], 'y': [1, 0, 0]})
dask_df = dd.from_pandas(pandas_df, npartitions=2)
dask_df.to_parquet("gcs://my_bucket/test", engine='fastparquet')
read_again_df = dd.read_parquet("gcs://my_bucket/test/", engine='fastparquet')
请注意,由于某些错误原因,dask_df.to_parquet()
需要使用 "gcs://my_bucket/test" 调用,不带“/”,否则 dd.read_parquet()
不起作用
我正在尝试使用 Dask 从 google 存储桶中读取和写入。
使用一堆 csv
文件可行,但不方便(速度较慢,无法压缩,不能只读取某些列)所以我尝试使用 apache parquet
格式。
写作似乎还不错:
import dask.dataframe as dd
pandas_df = pd.DataFrame({'x' : [2,3, 2], 'y': [1, 0, 0]})
dask_df = dd.from_pandas(pandas_df, npartitions=2)
dask_df.to_parquet("gcs://my_google_bucket/test/")
但是当我尝试读回它时
read_again_df = dd.read_parquet("gcs://my_google_bucket/test/")
我收到一个未实现的错误:
AttributeError Traceback (most recent call last)
~/miniconda3/envs/env1/lib/python3.6/site-packages/dask/bytes/core.py in get_pyarrow_filesystem(fs)
520 try:
--> 521 return fs._get_pyarrow_filesystem()
522 except AttributeError:
AttributeError: 'DaskGCSFileSystem' object has no attribute '_get_pyarrow_filesystem'
During handling of the above exception, another exception occurred:
NotImplementedError Traceback (most recent call last)
<ipython-input-42-ef1fc41d04d5> in <module>()
----> 1 read_again = dd.read_parquet("gcs://my_google_bucket/test/")
~/miniconda3/envs/env1/lib/python3.6/site-packages/dask/dataframe/io/parquet.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, infer_divisions)
991
992 return read(fs, fs_token, paths, columns=columns, filters=filters,
--> 993 categories=categories, index=index, infer_divisions=infer_divisions)
994
995
~/miniconda3/envs/env1/lib/python3.6/site-packages/dask/dataframe/io/parquet.py in _read_pyarrow(fs, fs_token, paths, columns, filters, categories, index, infer_divisions)
505 columns = list(columns)
506
--> 507 dataset = pq.ParquetDataset(paths, filesystem=get_pyarrow_filesystem(fs))
508 if dataset.partitions is not None:
509 partitions = [n for n in dataset.partitions.partition_names
~/miniconda3/envs/env1/lib/python3.6/site-packages/dask/bytes/core.py in get_pyarrow_filesystem(fs)
522 except AttributeError:
523 raise NotImplementedError("Using pyarrow with a %r "
--> 524 "filesystem object" % type(fs).__name__)
NotImplementedError: Using pyarrow with a 'DaskGCSFileSystem' filesystem object
我猜这意味着 dask
仍然无法直接从 google 云服务读取 parquet 文件。
有没有什么间接的方法可以使这项工作,比如说,使用 pyarrow
?
我想保留的是延迟加载东西然后使用dask
做数据转换的能力。
谢谢!
Dask 当然可以使用 fastparquet 后端从 GCS 读取 parquet (engine='fastparquet'
)。请注意,pyarrow 不会生成 fastparquet 期望的 _metadata
文件,因此您可以使用 fastparquet 写入数据,使用 fastparquet 从现有数据文件创建文件,或者传递一个指向所有数据文件而不是目录。
你正在做的事情也应该与 pyarrow 一起工作,因为 pyarrow 通常可以接受任何 python 类文件对象,但在这种情况下似乎试图制作一个 pyarrow 文件系统。您在上面看到的错误可能是错误,应该进行调查。
-编辑-
根据 OP 的评论,以下内容确实有效
pandas_df = pd.DataFrame({'x' : [2,3, 2], 'y': [1, 0, 0]})
dask_df = dd.from_pandas(pandas_df, npartitions=2)
dask_df.to_parquet("gcs://my_bucket/test", engine='fastparquet')
read_again_df = dd.read_parquet("gcs://my_bucket/test/", engine='fastparquet')
请注意,由于某些错误原因,dask_df.to_parquet()
需要使用 "gcs://my_bucket/test" 调用,不带“/”,否则 dd.read_parquet()
不起作用