如何使用 pyarrow 从 S3 读取镶木地板文件列表作为 pandas 数据帧?
How to read a list of parquet files from S3 as a pandas dataframe using pyarrow?
我使用 boto3
(1.4.4)、pyarrow
(0.4.1) 和 pandas
(0.20.3) 来实现这一点。
首先,我可以像这样在本地读取单个 parquet 文件:
import pyarrow.parquet as pq
path = 'parquet/part-r-00000-1e638be4-e31f-498a-a359-47d017a0059c.gz.parquet'
table = pq.read_table(path)
df = table.to_pandas()
我也可以像这样在本地读取 parquet 文件的目录:
import pyarrow.parquet as pq
dataset = pq.ParquetDataset('parquet/')
table = dataset.read()
df = table.to_pandas()
两者都很有魅力。现在我想通过存储在 S3 存储桶中的文件远程实现相同的目的。我希望这样的事情会奏效:
dataset = pq.ParquetDataset('s3n://dsn/to/my/bucket')
但事实并非如此:
OSError: Passed non-file path: s3n://dsn/to/my/bucket
看完pyarrow's documentation thoroughly, this does not seem possible at the moment。所以我想出了以下解决方案:
从 S3 读取单个文件并获取 pandas 数据帧:
import io
import boto3
import pyarrow.parquet as pq
buffer = io.BytesIO()
s3 = boto3.resource('s3')
s3_object = s3.Object('bucket-name', 'key/to/parquet/file.gz.parquet')
s3_object.download_fileobj(buffer)
table = pq.read_table(buffer)
df = table.to_pandas()
这是我从 S3 文件夹路径创建 pandas 数据框的笨拙的、未优化的解决方案:
import io
import boto3
import pandas as pd
import pyarrow.parquet as pq
bucket_name = 'bucket-name'
def download_s3_parquet_file(s3, bucket, key):
buffer = io.BytesIO()
s3.Object(bucket, key).download_fileobj(buffer)
return buffer
client = boto3.client('s3')
s3 = boto3.resource('s3')
objects_dict = client.list_objects_v2(Bucket=bucket_name, Prefix='my/folder/prefix')
s3_keys = [item['Key'] for item in objects_dict['Contents'] if item['Key'].endswith('.parquet')]
buffers = [download_s3_parquet_file(s3, bucket_name, key) for key in s3_keys]
dfs = [pq.read_table(buffer).to_pandas() for buffer in buffers]
df = pd.concat(dfs, ignore_index=True)
有没有更好的方法来实现这个?也许是某种使用 pyarrow 的 pandas 连接器?我想避免使用pyspark
,但如果没有其他解决方案,那么我会采用它。
您可以使用 dask 中的 s3fs,它实现了 s3 的文件系统接口。然后你可以像这样使用 ParquetDataset 的文件系统参数:
import s3fs
s3 = s3fs.S3FileSystem()
dataset = pq.ParquetDataset('s3n://dsn/to/my/bucket', filesystem=s3)
您应该按照 yjk21 的建议使用 s3fs
模块。但是,作为调用 ParquetDataset 的结果,您将获得一个 pyarrow.parquet.ParquetDataset 对象。要获得 Pandas DataFrame,您宁愿对其应用 .read_pandas().to_pandas()
:
import pyarrow.parquet as pq
import s3fs
s3 = s3fs.S3FileSystem()
pandas_dataframe = pq.ParquetDataset('s3://your-bucket/', filesystem=s3).read_pandas().to_pandas()
不用pyarrow也可以用boto3完成
import boto3
import io
import pandas as pd
# Read the parquet file
buffer = io.BytesIO()
s3 = boto3.resource('s3')
object = s3.Object('bucket_name','key')
object.download_fileobj(buffer)
df = pd.read_parquet(buffer)
print(df.head())
将云上的 parquet 数据读取到数据帧中的最简单方法可能是以这种方式使用 dask.dataframe:
import dask.dataframe as dd
df = dd.read_parquet('s3://bucket/path/to/data-*.parq')
dask.dataframe
可以read from Google Cloud Storage, Amazon S3, Hadoop file system and more!
谢谢!你的问题其实告诉了我很多。这就是我现在使用 pandas
(0.21.1) 的方式,它将调用 pyarrow
和 boto3
(1.3.1).
import boto3
import io
import pandas as pd
# Read single parquet file from S3
def pd_read_s3_parquet(key, bucket, s3_client=None, **args):
if s3_client is None:
s3_client = boto3.client('s3')
obj = s3_client.get_object(Bucket=bucket, Key=key)
return pd.read_parquet(io.BytesIO(obj['Body'].read()), **args)
# Read multiple parquets from a folder on S3 generated by spark
def pd_read_s3_multiple_parquets(filepath, bucket, s3=None,
s3_client=None, verbose=False, **args):
if not filepath.endswith('/'):
filepath = filepath + '/' # Add '/' to the end
if s3_client is None:
s3_client = boto3.client('s3')
if s3 is None:
s3 = boto3.resource('s3')
s3_keys = [item.key for item in s3.Bucket(bucket).objects.filter(Prefix=filepath)
if item.key.endswith('.parquet')]
if not s3_keys:
print('No parquet found in', bucket, filepath)
elif verbose:
print('Load parquets:')
for p in s3_keys:
print(p)
dfs = [pd_read_s3_parquet(key, bucket=bucket, s3_client=s3_client, **args)
for key in s3_keys]
return pd.concat(dfs, ignore_index=True)
然后可以通过
从S3中读取一个文件夹下的多个parquet
df = pd_read_s3_multiple_parquets('path/to/folder', 'my_bucket')
(我想可以大大简化这段代码。)
如果您愿意也使用 AWS Data Wrangler。
import awswrangler as wr
df = wr.s3.read_parquet(path="s3://...")
前提是你有正确的包设置
$ pip install pandas==1.1.0 pyarrow==1.0.0 s3fs==0.4.2
和您的 AWS 共享 config and credentials files configured appropriately
您可以马上使用pandas
:
import pandas as pd
df = pd.read_parquet("s3://bucket/key.parquet")
如果有多个 AWS profiles,您可能还需要设置
$ export AWS_DEFAULT_PROFILE=profile_under_which_the_bucket_is_accessible
这样您就可以访问您的存储桶。
我使用 boto3
(1.4.4)、pyarrow
(0.4.1) 和 pandas
(0.20.3) 来实现这一点。
首先,我可以像这样在本地读取单个 parquet 文件:
import pyarrow.parquet as pq
path = 'parquet/part-r-00000-1e638be4-e31f-498a-a359-47d017a0059c.gz.parquet'
table = pq.read_table(path)
df = table.to_pandas()
我也可以像这样在本地读取 parquet 文件的目录:
import pyarrow.parquet as pq
dataset = pq.ParquetDataset('parquet/')
table = dataset.read()
df = table.to_pandas()
两者都很有魅力。现在我想通过存储在 S3 存储桶中的文件远程实现相同的目的。我希望这样的事情会奏效:
dataset = pq.ParquetDataset('s3n://dsn/to/my/bucket')
但事实并非如此:
OSError: Passed non-file path: s3n://dsn/to/my/bucket
看完pyarrow's documentation thoroughly, this does not seem possible at the moment。所以我想出了以下解决方案:
从 S3 读取单个文件并获取 pandas 数据帧:
import io
import boto3
import pyarrow.parquet as pq
buffer = io.BytesIO()
s3 = boto3.resource('s3')
s3_object = s3.Object('bucket-name', 'key/to/parquet/file.gz.parquet')
s3_object.download_fileobj(buffer)
table = pq.read_table(buffer)
df = table.to_pandas()
这是我从 S3 文件夹路径创建 pandas 数据框的笨拙的、未优化的解决方案:
import io
import boto3
import pandas as pd
import pyarrow.parquet as pq
bucket_name = 'bucket-name'
def download_s3_parquet_file(s3, bucket, key):
buffer = io.BytesIO()
s3.Object(bucket, key).download_fileobj(buffer)
return buffer
client = boto3.client('s3')
s3 = boto3.resource('s3')
objects_dict = client.list_objects_v2(Bucket=bucket_name, Prefix='my/folder/prefix')
s3_keys = [item['Key'] for item in objects_dict['Contents'] if item['Key'].endswith('.parquet')]
buffers = [download_s3_parquet_file(s3, bucket_name, key) for key in s3_keys]
dfs = [pq.read_table(buffer).to_pandas() for buffer in buffers]
df = pd.concat(dfs, ignore_index=True)
有没有更好的方法来实现这个?也许是某种使用 pyarrow 的 pandas 连接器?我想避免使用pyspark
,但如果没有其他解决方案,那么我会采用它。
您可以使用 dask 中的 s3fs,它实现了 s3 的文件系统接口。然后你可以像这样使用 ParquetDataset 的文件系统参数:
import s3fs
s3 = s3fs.S3FileSystem()
dataset = pq.ParquetDataset('s3n://dsn/to/my/bucket', filesystem=s3)
您应该按照 yjk21 的建议使用 s3fs
模块。但是,作为调用 ParquetDataset 的结果,您将获得一个 pyarrow.parquet.ParquetDataset 对象。要获得 Pandas DataFrame,您宁愿对其应用 .read_pandas().to_pandas()
:
import pyarrow.parquet as pq
import s3fs
s3 = s3fs.S3FileSystem()
pandas_dataframe = pq.ParquetDataset('s3://your-bucket/', filesystem=s3).read_pandas().to_pandas()
不用pyarrow也可以用boto3完成
import boto3
import io
import pandas as pd
# Read the parquet file
buffer = io.BytesIO()
s3 = boto3.resource('s3')
object = s3.Object('bucket_name','key')
object.download_fileobj(buffer)
df = pd.read_parquet(buffer)
print(df.head())
将云上的 parquet 数据读取到数据帧中的最简单方法可能是以这种方式使用 dask.dataframe:
import dask.dataframe as dd
df = dd.read_parquet('s3://bucket/path/to/data-*.parq')
dask.dataframe
可以read from Google Cloud Storage, Amazon S3, Hadoop file system and more!
谢谢!你的问题其实告诉了我很多。这就是我现在使用 pandas
(0.21.1) 的方式,它将调用 pyarrow
和 boto3
(1.3.1).
import boto3
import io
import pandas as pd
# Read single parquet file from S3
def pd_read_s3_parquet(key, bucket, s3_client=None, **args):
if s3_client is None:
s3_client = boto3.client('s3')
obj = s3_client.get_object(Bucket=bucket, Key=key)
return pd.read_parquet(io.BytesIO(obj['Body'].read()), **args)
# Read multiple parquets from a folder on S3 generated by spark
def pd_read_s3_multiple_parquets(filepath, bucket, s3=None,
s3_client=None, verbose=False, **args):
if not filepath.endswith('/'):
filepath = filepath + '/' # Add '/' to the end
if s3_client is None:
s3_client = boto3.client('s3')
if s3 is None:
s3 = boto3.resource('s3')
s3_keys = [item.key for item in s3.Bucket(bucket).objects.filter(Prefix=filepath)
if item.key.endswith('.parquet')]
if not s3_keys:
print('No parquet found in', bucket, filepath)
elif verbose:
print('Load parquets:')
for p in s3_keys:
print(p)
dfs = [pd_read_s3_parquet(key, bucket=bucket, s3_client=s3_client, **args)
for key in s3_keys]
return pd.concat(dfs, ignore_index=True)
然后可以通过
从S3中读取一个文件夹下的多个parquetdf = pd_read_s3_multiple_parquets('path/to/folder', 'my_bucket')
(我想可以大大简化这段代码。)
如果您愿意也使用 AWS Data Wrangler。
import awswrangler as wr
df = wr.s3.read_parquet(path="s3://...")
前提是你有正确的包设置
$ pip install pandas==1.1.0 pyarrow==1.0.0 s3fs==0.4.2
和您的 AWS 共享 config and credentials files configured appropriately
您可以马上使用pandas
:
import pandas as pd
df = pd.read_parquet("s3://bucket/key.parquet")
如果有多个 AWS profiles,您可能还需要设置
$ export AWS_DEFAULT_PROFILE=profile_under_which_the_bucket_is_accessible
这样您就可以访问您的存储桶。