从 pandas 上传到 S3 时如何添加标签?
How to add tags when uploading to S3 from pandas?
Pandas 允许您将 AWS S3 路径直接传递给 .to_csv()
和 .to_parquet()
。
有一个 storage_options
参数用于传递 S3 特定参数。
我想调用 .to_csv('s3://bucket/key.csv', storage_options=something)
并指定要应用于上传对象的 S3 对象标签,如 something
。
我已经阅读了文档,但我不知道如何做,
The pandas docs 不列出 storage_options
的可能值,它们只是指向 fsspec
。看起来 pandas 调用 fsspec,后者调用 s3fs,后者调用 aiobotocore,后者调用 botocore,并且可能调用 s3transfer。我怎样才能将 S3 标签参数一直传递到这个兔子洞里?
MWE
import pandas as pd
import boto3
bucket = 'mybucket' # change for your bucket
key = 'test/pandas/tags.csv'
tags = {'mytag': 'x'}
df = pd.DataFrame([{'a': 1}])
df.to_csv(f"s3://{bucket}/{key}") # try without any tags first
df.to_csv(f"s3://{bucket}/{key}", storage_options={'tags': tags})
resp = boto3.client('s3').get_object_tagging(Bucket=bucket, Key=key)
actual_tags = {t['Key']: t['Value'] for t in resp.get('TagSet', [])}
assert actual_tags == tags
预期行为
断言通过。 S3 对象具有标签 mytag
:x
实际行为
第二 .to_csv()
行失败。
即它可以在没有标签的情况下工作。标签是导致失败的原因。
Traceback (most recent call last):
File "upld.py", line 9, in <module>
df.to_csv(f"s3://{bucket}/{key}", storage_options={'tags': tags})
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/pandas/core/generic.py", line 3463, in to_csv
return DataFrameRenderer(formatter).to_csv(
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/pandas/io/formats/format.py", line 1105, in to_csv
csv_formatter.save()
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/pandas/io/formats/csvs.py", line 237, in save
with get_handle(
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/pandas/io/common.py", line 608, in get_handle
ioargs = _get_filepath_or_buffer(
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/pandas/io/common.py", line 357, in _get_filepath_or_buffer
file_obj = fsspec.open(
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/core.py", line 456, in open
return open_files(
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/core.py", line 299, in open_files
[fs.makedirs(parent, exist_ok=True) for parent in parents]
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/core.py", line 299, in <listcomp>
[fs.makedirs(parent, exist_ok=True) for parent in parents]
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/asyn.py", line 91, in wrapper
return sync(self.loop, func, *args, **kwargs)
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/asyn.py", line 71, in sync
raise return_result
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/asyn.py", line 25, in _runner
result[0] = await coro
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/s3fs/core.py", line 746, in _makedirs
await self._mkdir(path, create_parents=True)
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/s3fs/core.py", line 731, in _mkdir
await self._call_s3("create_bucket", **params)
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/s3fs/core.py", line 252, in _call_s3
await self.set_session()
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/s3fs/core.py", line 395, in set_session
self.session = aiobotocore.session.AioSession(**self.kwargs)
TypeError: __init__() got an unexpected keyword argument 'tags'
看起来这些参数正在传递给 aiobotocore 会话实例化,而不是来自 aiobotocore 的实际 S3 put_object API 调用。这让我觉得这是不可能的。
备选方案
我要不要试试:
storage_options={
'tags': {
'k': 'v'
}
}
或
storage_options={
'tags': [
{'Key': 'k', 'Value': 'v'}
]
}
当然我可以不带标签上传,然后作为单独的 boto 调用添加标签。
这不是原子的,并且成本是原来的两倍(对于小文件)。
如果有办法从上传中获取版本 ID,那将消除一些并发问题(并发写入)。
所以我花了一些时间在这方面进行挖掘。我在这里可能是错的,但我认为这是不可能的。这就是我相信的原因:
如果路径是不以 http
(see here) 开头的 url,storage_options
会传递给 fsspec
。然后这些选项通过 fsspec
传递到 s3fs.S3Filesystem
作为 kwargs
。然后 kwargs dead-end 带有错误消息中的函数。
(这是我可能出错的地方!)然后 S3FileSystem
调用 _put_file
来编写您的 csv。此函数不使用 self.kwargs
,但接收 function-level kwargs
不会被 pandas.
传递
因此,我认为无法通过 pandas 中的 to_X
访问标签。但是,值得在 Pandas/fsspec github 上提出问题以获取更多信息。
Pandas 允许您将 AWS S3 路径直接传递给 .to_csv()
和 .to_parquet()
。
有一个 storage_options
参数用于传递 S3 特定参数。
我想调用 .to_csv('s3://bucket/key.csv', storage_options=something)
并指定要应用于上传对象的 S3 对象标签,如 something
。
我已经阅读了文档,但我不知道如何做,
The pandas docs 不列出 storage_options
的可能值,它们只是指向 fsspec
。看起来 pandas 调用 fsspec,后者调用 s3fs,后者调用 aiobotocore,后者调用 botocore,并且可能调用 s3transfer。我怎样才能将 S3 标签参数一直传递到这个兔子洞里?
MWE
import pandas as pd
import boto3
bucket = 'mybucket' # change for your bucket
key = 'test/pandas/tags.csv'
tags = {'mytag': 'x'}
df = pd.DataFrame([{'a': 1}])
df.to_csv(f"s3://{bucket}/{key}") # try without any tags first
df.to_csv(f"s3://{bucket}/{key}", storage_options={'tags': tags})
resp = boto3.client('s3').get_object_tagging(Bucket=bucket, Key=key)
actual_tags = {t['Key']: t['Value'] for t in resp.get('TagSet', [])}
assert actual_tags == tags
预期行为
断言通过。 S3 对象具有标签 mytag
:x
实际行为
第二 .to_csv()
行失败。
即它可以在没有标签的情况下工作。标签是导致失败的原因。
Traceback (most recent call last):
File "upld.py", line 9, in <module>
df.to_csv(f"s3://{bucket}/{key}", storage_options={'tags': tags})
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/pandas/core/generic.py", line 3463, in to_csv
return DataFrameRenderer(formatter).to_csv(
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/pandas/io/formats/format.py", line 1105, in to_csv
csv_formatter.save()
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/pandas/io/formats/csvs.py", line 237, in save
with get_handle(
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/pandas/io/common.py", line 608, in get_handle
ioargs = _get_filepath_or_buffer(
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/pandas/io/common.py", line 357, in _get_filepath_or_buffer
file_obj = fsspec.open(
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/core.py", line 456, in open
return open_files(
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/core.py", line 299, in open_files
[fs.makedirs(parent, exist_ok=True) for parent in parents]
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/core.py", line 299, in <listcomp>
[fs.makedirs(parent, exist_ok=True) for parent in parents]
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/asyn.py", line 91, in wrapper
return sync(self.loop, func, *args, **kwargs)
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/asyn.py", line 71, in sync
raise return_result
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/asyn.py", line 25, in _runner
result[0] = await coro
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/s3fs/core.py", line 746, in _makedirs
await self._mkdir(path, create_parents=True)
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/s3fs/core.py", line 731, in _mkdir
await self._call_s3("create_bucket", **params)
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/s3fs/core.py", line 252, in _call_s3
await self.set_session()
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/s3fs/core.py", line 395, in set_session
self.session = aiobotocore.session.AioSession(**self.kwargs)
TypeError: __init__() got an unexpected keyword argument 'tags'
看起来这些参数正在传递给 aiobotocore 会话实例化,而不是来自 aiobotocore 的实际 S3 put_object API 调用。这让我觉得这是不可能的。
备选方案
我要不要试试:
storage_options={
'tags': {
'k': 'v'
}
}
或
storage_options={
'tags': [
{'Key': 'k', 'Value': 'v'}
]
}
当然我可以不带标签上传,然后作为单独的 boto 调用添加标签。 这不是原子的,并且成本是原来的两倍(对于小文件)。 如果有办法从上传中获取版本 ID,那将消除一些并发问题(并发写入)。
所以我花了一些时间在这方面进行挖掘。我在这里可能是错的,但我认为这是不可能的。这就是我相信的原因:
如果路径是不以http
(see here) 开头的 url,storage_options
会传递给 fsspec
。然后这些选项通过 fsspec
传递到 s3fs.S3Filesystem
作为 kwargs
。然后 kwargs dead-end 带有错误消息中的函数。
(这是我可能出错的地方!)然后 S3FileSystem
调用 _put_file
来编写您的 csv。此函数不使用 self.kwargs
,但接收 function-level kwargs
不会被 pandas.
因此,我认为无法通过 pandas 中的 to_X
访问标签。但是,值得在 Pandas/fsspec github 上提出问题以获取更多信息。