用于 Redshift 的 fastparquet 导出

fastparquet export for Redshift

我有一个很简单的想法:使用Python Pandas(为了方便)做一些简单的数据库操作,数据量适中,然后将数据以Parquet格式写回S3。 然后,数据应该作为外部 table 暴露给 Redshift,以免从实际的 Redshift 集群中占用存储空间 space。

我找到了两种方法。


给定数据:

data = {
    'int': [1, 2, 3, 4, None],
    'float': [1.1, None, 3.4, 4.0, 5.5],
    'str': [None, 'two', 'three', 'four', 'five'],
    'boolean': [True, None, True, False, False],
    'date': [
        date(2000, 1, 1),
        date(2000, 1, 2),
        date(2000, 1, 3),
        date(2000, 1, 4),
        None,
    ],
    'timestamp': [
        datetime(2000, 1, 1, 1, 1, 1),
        datetime(2000, 1, 1, 1, 1, 2),
        None,
        datetime(2000, 1, 1, 1, 1, 4),
        datetime(2000, 1, 1, 1, 1, 5),
    ]
}

df = pd.DataFrame(data)

df['int'] = df['int'].astype(pd.Int64Dtype())
df['date'] = df['date'].astype('datetime64[D]')
df['timestamp'] = df['timestamp'].astype('datetime64[s]')

最后的类型转换在这两种情况下都是必要的,以断言 Pandas' 类型识别不会干扰。

使用 PyArrow:

使用 Pyarrow,你可以这样做:

import pyarrow as pa

pyarrow_schema = pa.schema([
    ('int', pa.int64()),
    ('float', pa.float64()),
    ('str', pa.string()),
    ('bool', pa.bool_()),
    ('date', pa.date64()),
    ('timestamp', pa.timestamp(unit='s'))
])

df.to_parquet(
    path='pyarrow.parquet',
    schema=pyarrow_schema,
    engine='pyarrow'
)

为什么使用 PyArrow:Pandas' Parquet 导出的默认引擎是 PyArrow,因此您可以期待良好的集成。此外,PyArrow 提供了广泛的功能并适用于多种数据类型。

使用 fastparquet:

首先,您需要通过以下额外步骤写出数据:

from fastparquet import write

write('fast.parquet', df, has_nulls=True, times='int96')

这里重要的一点是 'times' 参数。请参阅 this post,我在其中找到了 'date' 列的补救措施。

为什么使用 fastparquet:Fastparquet 比 PyArrow 有更多限制,尤其是在接受数据类型方面。另一方面,包裹要小得多。

外部table:

鉴于您已将数据导出到 Parquet 并将其存储在 S3 中,然后您可以像这样将其公开给 Redshift:

CREATE EXTERNAL TABLE "<your_external_schema>"."<your_table_name>" (
 "int" bigint,
 "float" float,
 "str" varchar(255),
 "boolean" bool,
 "date" date,
 "timestamp" timestamp)
 ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
location 
  's3://<your_bucket>/<your_prefix>/';

最后的故事和注释:

当我开始在 AWS Lambda 函数的上下文中使用 Pandas、Parquet 和外部 Redshift tables 时,有一段时间一切都很好。直到我到达某个点,我的 Lambda 包的捆绑包达到了允许的限制 (Deployment package size)。检查我的哪些依赖项弥补了所有这些,我发现 PyArrow、Pandas 和 Numpy(Pandas 的依赖项)是罪魁祸首。虽然我绝对不能放弃 Numpy(为了提高效率)并且不想松开 Pandas(再次方便),但我希望用更轻量级的东西替换 PyArrow。 Et voila:Fastparquet。经过一些研究和大量实验,我可以使它也起作用。

我希望这个解释和资源对其他一些人有所帮助。

问题已有答案。 :)