从 Google 云存储桶复制到 S3 桶
Copy from Google Cloud Storage Bucket to S3 Bucket
我已经设置了一个 airflow 工作流程,将一些文件从 s3 提取到 Google 云存储,然后运行 sql 查询的工作流程以在 Big Query 上创建新的 tables .在工作流结束时,我需要将最后一个大查询 table 的输出推送到 Google Cloud Storage,然后从那里推送到 S3。
我使用 BigQueryToCloudStorageOperator
python 运算符破解了 Big Query table 到 Google Cloud Storage 的传输,没有任何问题。然而,从 Google Cloud Storage 到 S3 的传输似乎是一条不太受欢迎的路线,我一直无法找到可以在我的 Airflow 工作流程中实现自动化的解决方案。
我知道 rsync
作为 gsutil
的一部分并且已经开始工作(参见 post )但我无法添加它进入我的工作流程。
我在计算引擎实例上有一个 dockerised 气流容器 运行。
非常感谢帮助解决这个问题。
非常感谢!
所以我们也使用 rsync
在 S3 和 GCS 之间移动数据,
您首先需要让 bash 脚本正常工作,例如 gsutil -m rsync -d -r gs://bucket/key s3://bucket/key
对于 s3,您还需要提供 AWS_ACCESS_KEY_ID
和 AWS_SECRET_ACCESS_KEY
作为环境变量。
然后定义您的 BashOperator 并将其放入您的 DAG 文件
rsync_yesterday = BashOperator(task_id='rsync_task_' + table,
bash_command='Your rsync script',
dag=dag)
Google 建议使用它的 transfer service
进行云平台之间的传输。您可以使用他们的 python API 以编程方式设置传输。这样数据就直接在 S3 和 google 云存储之间传输。使用 gsutil
和 rsync
的缺点是数据必须通过执行 rsync
命令的 machine/instance。这可能是一个瓶颈。
我需要使用 AWS Lambda 将对象从 GC 存储桶复制到 S3。
Python boto3 库 允许列出和下载 来自 GC 存储桶的对象。
以下是将 "sample-data-s3.csv" 对象从 GC 存储桶复制到 s3 存储桶的示例 lambda 代码。
import boto3
import io
s3 = boto3.resource('s3')
google_access_key_id="GOOG1EIxxMYKEYxxMQ"
google_access_key_secret="QifDxxMYSECRETKEYxxVU1oad1b"
gc_bucket_name="my_gc_bucket"
def get_gcs_objects(google_access_key_id, google_access_key_secret,
gc_bucket_name):
"""Gets GCS objects using boto3 SDK"""
client = boto3.client("s3", region_name="auto",
endpoint_url="https://storage.googleapis.com",
aws_access_key_id=google_access_key_id,
aws_secret_access_key=google_access_key_secret)
# Call GCS to list objects in gc_bucket_name
response = client.list_objects(Bucket=gc_bucket_name)
# Print object names
print("Objects:")
for blob in response["Contents"]:
print(blob)
object = s3.Object('my_aws_s3_bucket', 'sample-data-s3.csv')
f = io.BytesIO()
client.download_fileobj(gc_bucket_name,"sample-data.csv",f)
object.put(Body=f.getvalue())
def lambda_handler(event, context):
get_gcs_objects(google_access_key_id,google_access_key_secret,gc_bucket_name)
您可以循环 blob
从 GC 桶中下载所有对象。
希望这对想要使用 AWS lambda 将对象从 GC 存储桶传输到 s3 存储桶的人有所帮助。
最简单的整体选项是 gsutil rsync,但在某些情况下,rsync 可能会占用太多资源或速度不够快。
几个其他选择:
- 查看 Amazon 的数据传输服务,称为 Amazon S3 Transfer Acceleration,它将允许您将数据从 GCS 导入到 S3
- 使用Hadoop DistCp with DataProc to parallelize the copying process (see example)
我已经设置了一个 airflow 工作流程,将一些文件从 s3 提取到 Google 云存储,然后运行 sql 查询的工作流程以在 Big Query 上创建新的 tables .在工作流结束时,我需要将最后一个大查询 table 的输出推送到 Google Cloud Storage,然后从那里推送到 S3。
我使用 BigQueryToCloudStorageOperator
python 运算符破解了 Big Query table 到 Google Cloud Storage 的传输,没有任何问题。然而,从 Google Cloud Storage 到 S3 的传输似乎是一条不太受欢迎的路线,我一直无法找到可以在我的 Airflow 工作流程中实现自动化的解决方案。
我知道 rsync
作为 gsutil
的一部分并且已经开始工作(参见 post
我在计算引擎实例上有一个 dockerised 气流容器 运行。
非常感谢帮助解决这个问题。
非常感谢!
所以我们也使用 rsync
在 S3 和 GCS 之间移动数据,
您首先需要让 bash 脚本正常工作,例如 gsutil -m rsync -d -r gs://bucket/key s3://bucket/key
对于 s3,您还需要提供 AWS_ACCESS_KEY_ID
和 AWS_SECRET_ACCESS_KEY
作为环境变量。
然后定义您的 BashOperator 并将其放入您的 DAG 文件
rsync_yesterday = BashOperator(task_id='rsync_task_' + table,
bash_command='Your rsync script',
dag=dag)
Google 建议使用它的 transfer service
进行云平台之间的传输。您可以使用他们的 python API 以编程方式设置传输。这样数据就直接在 S3 和 google 云存储之间传输。使用 gsutil
和 rsync
的缺点是数据必须通过执行 rsync
命令的 machine/instance。这可能是一个瓶颈。
我需要使用 AWS Lambda 将对象从 GC 存储桶复制到 S3。
Python boto3 库 允许列出和下载 来自 GC 存储桶的对象。
以下是将 "sample-data-s3.csv" 对象从 GC 存储桶复制到 s3 存储桶的示例 lambda 代码。
import boto3
import io
s3 = boto3.resource('s3')
google_access_key_id="GOOG1EIxxMYKEYxxMQ"
google_access_key_secret="QifDxxMYSECRETKEYxxVU1oad1b"
gc_bucket_name="my_gc_bucket"
def get_gcs_objects(google_access_key_id, google_access_key_secret,
gc_bucket_name):
"""Gets GCS objects using boto3 SDK"""
client = boto3.client("s3", region_name="auto",
endpoint_url="https://storage.googleapis.com",
aws_access_key_id=google_access_key_id,
aws_secret_access_key=google_access_key_secret)
# Call GCS to list objects in gc_bucket_name
response = client.list_objects(Bucket=gc_bucket_name)
# Print object names
print("Objects:")
for blob in response["Contents"]:
print(blob)
object = s3.Object('my_aws_s3_bucket', 'sample-data-s3.csv')
f = io.BytesIO()
client.download_fileobj(gc_bucket_name,"sample-data.csv",f)
object.put(Body=f.getvalue())
def lambda_handler(event, context):
get_gcs_objects(google_access_key_id,google_access_key_secret,gc_bucket_name)
您可以循环 blob
从 GC 桶中下载所有对象。
希望这对想要使用 AWS lambda 将对象从 GC 存储桶传输到 s3 存储桶的人有所帮助。
最简单的整体选项是 gsutil rsync,但在某些情况下,rsync 可能会占用太多资源或速度不够快。
几个其他选择:
- 查看 Amazon 的数据传输服务,称为 Amazon S3 Transfer Acceleration,它将允许您将数据从 GCS 导入到 S3
- 使用Hadoop DistCp with DataProc to parallelize the copying process (see example)