记录 pyarrow 在 S3 上创建的镶木地板文件名
Log parquet filenames created by pyarrow on S3
我们正在使用 pyarrow 将数据附加到存储在 S3(分区)中的现有镶木地板数据集。这每小时在 AWS lambda 上运行几次。一个最小的例子是:
import pyarrow as pa
import pyarrow.parquet as pq
import s3fs
df = ... # Existing pandas df
table = pa.Table.from_pandas(df)
pq.write_to_dataset(
table,
filesystem=s3,
root_path=f"s3://s3-path/",
partition_cols=['year', "month"]
)
因此,根据内部数据值,许多镶木地板文件将写入 S3。我们的目标是通过输出结果 filename(S3 密钥)来跟踪哪些文件已写入文件系统。
有什么方法可以捕获 pyarrow
或 s3fs
写入的实际文件名? Parquet 文件名是根据计算的哈希名称任意命名的,我没有看到提到的两个包的任何日志记录功能。
从 0.15.0 开始,您可以在写入前为文件提供 partition_filename_cb
名称。
pyarrow.parquet.write_to_dataset(table, root_path, partition_cols=None, partition_filename_cb=None, filesystem=None, **kwargs)
如果您愿意也使用 AWS Data Wrangler:
import awswrangler as wr
paths = wr.pandas.to_parquet(
dataframe=df,
path="s3://...",
dataset=True,
database="my_database", # Optional, only with you want it available on Athena/Glue Catalog
table="my_table",
partition_cols=["PARTITION_COL_NAME"])["paths"]
print(paths)
只是为了澄清@Prabhakar Reddy 的回答....partition_filename_cb 参数需要一个回调函数。如果您希望提供如下所示的字符串,只需使用 lambda。
pyarrow.parquet.write_to_dataset(table, root_path, partition_cols=None, partition_filename_cb=lambda x: 'myfilename.parquet', filesystem=None, **kwargs)
我们正在使用 pyarrow 将数据附加到存储在 S3(分区)中的现有镶木地板数据集。这每小时在 AWS lambda 上运行几次。一个最小的例子是:
import pyarrow as pa
import pyarrow.parquet as pq
import s3fs
df = ... # Existing pandas df
table = pa.Table.from_pandas(df)
pq.write_to_dataset(
table,
filesystem=s3,
root_path=f"s3://s3-path/",
partition_cols=['year', "month"]
)
因此,根据内部数据值,许多镶木地板文件将写入 S3。我们的目标是通过输出结果 filename(S3 密钥)来跟踪哪些文件已写入文件系统。
有什么方法可以捕获 pyarrow
或 s3fs
写入的实际文件名? Parquet 文件名是根据计算的哈希名称任意命名的,我没有看到提到的两个包的任何日志记录功能。
从 0.15.0 开始,您可以在写入前为文件提供 partition_filename_cb
名称。
pyarrow.parquet.write_to_dataset(table, root_path, partition_cols=None, partition_filename_cb=None, filesystem=None, **kwargs)
如果您愿意也使用 AWS Data Wrangler:
import awswrangler as wr
paths = wr.pandas.to_parquet(
dataframe=df,
path="s3://...",
dataset=True,
database="my_database", # Optional, only with you want it available on Athena/Glue Catalog
table="my_table",
partition_cols=["PARTITION_COL_NAME"])["paths"]
print(paths)
只是为了澄清@Prabhakar Reddy 的回答....partition_filename_cb 参数需要一个回调函数。如果您希望提供如下所示的字符串,只需使用 lambda。
pyarrow.parquet.write_to_dataset(table, root_path, partition_cols=None, partition_filename_cb=lambda x: 'myfilename.parquet', filesystem=None, **kwargs)