从 pandas 上传到 S3 时如何添加标签?

How to add tags when uploading to S3 from pandas?

Pandas 允许您将 AWS S3 路径直接传递给 .to_csv().to_parquet()。 有一个 storage_options 参数用于传递 S3 特定参数。

我想调用 .to_csv('s3://bucket/key.csv', storage_options=something) 并指定要应用于上传对象的 S3 对象标签,如 something。 我已经阅读了文档,但我不知道如何做,

The pandas docs 不列出 storage_options 的可能值,它们只是指向 fsspec。看起来 pandas 调用 fsspec,后者调用 s3fs,后者调用 aiobotocore,后者调用 botocore,并且可能调用 s3transfer。我怎样才能将 S3 标签参数一直传递到这个兔子洞里?

MWE

import pandas as pd
import boto3

bucket = 'mybucket' # change for your bucket
key = 'test/pandas/tags.csv'
tags = {'mytag': 'x'}

df = pd.DataFrame([{'a': 1}])
df.to_csv(f"s3://{bucket}/{key}") # try without any tags first
df.to_csv(f"s3://{bucket}/{key}", storage_options={'tags': tags})

resp = boto3.client('s3').get_object_tagging(Bucket=bucket, Key=key)
actual_tags = {t['Key']: t['Value'] for t in resp.get('TagSet', [])}
assert actual_tags == tags

预期行为

断言通过。 S3 对象具有标签 mytagx

实际行为

第二 .to_csv() 行失败。 即它可以在没有标签的情况下工作。标签是导致失败的原因。

Traceback (most recent call last):
  File "upld.py", line 9, in <module>
    df.to_csv(f"s3://{bucket}/{key}", storage_options={'tags': tags})
  File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/pandas/core/generic.py", line 3463, in to_csv
    return DataFrameRenderer(formatter).to_csv(
  File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/pandas/io/formats/format.py", line 1105, in to_csv
    csv_formatter.save()
  File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/pandas/io/formats/csvs.py", line 237, in save
    with get_handle(
  File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/pandas/io/common.py", line 608, in get_handle
    ioargs = _get_filepath_or_buffer(
  File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/pandas/io/common.py", line 357, in _get_filepath_or_buffer
    file_obj = fsspec.open(
  File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/core.py", line 456, in open
    return open_files(
  File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/core.py", line 299, in open_files
    [fs.makedirs(parent, exist_ok=True) for parent in parents]
  File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/core.py", line 299, in <listcomp>
    [fs.makedirs(parent, exist_ok=True) for parent in parents]
  File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/asyn.py", line 91, in wrapper
    return sync(self.loop, func, *args, **kwargs)
  File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/asyn.py", line 71, in sync
    raise return_result
  File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/asyn.py", line 25, in _runner
    result[0] = await coro
  File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/s3fs/core.py", line 746, in _makedirs
    await self._mkdir(path, create_parents=True)
  File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/s3fs/core.py", line 731, in _mkdir
    await self._call_s3("create_bucket", **params)
  File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/s3fs/core.py", line 252, in _call_s3
    await self.set_session()
  File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/s3fs/core.py", line 395, in set_session
    self.session = aiobotocore.session.AioSession(**self.kwargs)
TypeError: __init__() got an unexpected keyword argument 'tags'

看起来这些参数正在传递给 aiobotocore 会话实例化,而不是来自 aiobotocore 的实际 S3 put_object API 调用。这让我觉得这是不可能的。

备选方案

我要不要试试:

storage_options={
    'tags': {
        'k': 'v'
    }
}

storage_options={
    'tags': [
        {'Key': 'k', 'Value': 'v'}
    ]
}

当然我可以不带标签上传,然后作为单独的 boto 调用添加标签。 这不是原子的,并且成本是原来的两倍(对于小文件)。 如果有办法从上传中获取版本 ID,那将消除一些并发问题(并发写入)。

所以我花了一些时间在这方面进行挖掘。我在这里可能是错的,但我认为这是不可能的。这就是我相信的原因:

如果路径是不以 http (see here) 开头的 url,

storage_options 会传递给 fsspec。然后这些选项通过 fsspec 传递到 s3fs.S3Filesystem 作为 kwargs。然后 kwargs dead-end 带有错误消息中的函数。

(这是我可能出错的地方!)然后 S3FileSystem 调用 _put_file 来编写您的 csv。此函数不使用 self.kwargs,但接收 function-level kwargs 不会被 pandas.

传递

因此,我认为无法通过 pandas 中的 to_X 访问标签。但是,值得在 Pandas/fsspec github 上提出问题以获取更多信息。