Pyarrow s3fs 按时间戳分区
Pyarrow s3fs partition by timestamp
是否可以在 pyarrow
table 中使用时间戳字段在将 parquet 文件写入 s3
?
据我所知:没有。
它可以读取分区数据,但与写入无关。
有几个地方记录了写入函数,其中 none 采用了分区选项。
https://github.com/apache/arrow/blob/master/python/pyarrow/parquet.py#L941
我能够通过 pyarrow write_to_dataset 函数实现,该函数允许您指定分区列以创建子目录。
示例:
import os
import s3fs
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pyarrow.filesystem import S3FSWrapper
access_key = <access_key>
secret_key = <secret_key>
bucket_name = <bucket_name>
fs = s3fs.S3FileSystem(key=access_key, secret=secret_key)
bucket_uri = 's3://{0}/{1}'.format(bucket_name, "data")
data = {'date': ['2018-03-04T14:12:15.653Z', '2018-03-03T14:12:15.653Z', '2018-03-02T14:12:15.653Z', '2018-03-05T14:12:15.653Z'],
'battles': [34, 25, 26, 57],
'citys': ['london', 'newyork', 'boston', 'boston']}
df = pd.DataFrame(data, columns=['date', 'battles', 'citys'])
df['date'] = df['date'].map(lambda t: pd.to_datetime(t, format="%Y-%m-%dT%H:%M:%S.%fZ"))
df['year'], df['month'], df['day'] = df['date'].apply(lambda x: x.year), df['date'].apply(lambda x: x.month), df['date'].apply(lambda x: x.day)
df.groupby(by=['citys'])
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, bucket_uri, filesystem=fs, partition_cols=['year', 'month', 'day'], use_dictionary=True, compression='snappy', use_deprecated_int96_timestamps=True)
是否可以在 pyarrow
table 中使用时间戳字段在将 parquet 文件写入 s3
?
据我所知:没有。
它可以读取分区数据,但与写入无关。
有几个地方记录了写入函数,其中 none 采用了分区选项。
https://github.com/apache/arrow/blob/master/python/pyarrow/parquet.py#L941
我能够通过 pyarrow write_to_dataset 函数实现,该函数允许您指定分区列以创建子目录。
示例:
import os
import s3fs
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pyarrow.filesystem import S3FSWrapper
access_key = <access_key>
secret_key = <secret_key>
bucket_name = <bucket_name>
fs = s3fs.S3FileSystem(key=access_key, secret=secret_key)
bucket_uri = 's3://{0}/{1}'.format(bucket_name, "data")
data = {'date': ['2018-03-04T14:12:15.653Z', '2018-03-03T14:12:15.653Z', '2018-03-02T14:12:15.653Z', '2018-03-05T14:12:15.653Z'],
'battles': [34, 25, 26, 57],
'citys': ['london', 'newyork', 'boston', 'boston']}
df = pd.DataFrame(data, columns=['date', 'battles', 'citys'])
df['date'] = df['date'].map(lambda t: pd.to_datetime(t, format="%Y-%m-%dT%H:%M:%S.%fZ"))
df['year'], df['month'], df['day'] = df['date'].apply(lambda x: x.year), df['date'].apply(lambda x: x.month), df['date'].apply(lambda x: x.day)
df.groupby(by=['citys'])
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, bucket_uri, filesystem=fs, partition_cols=['year', 'month', 'day'], use_dictionary=True, compression='snappy', use_deprecated_int96_timestamps=True)