从 Google 云存储桶复制到 S3 桶

Copy from Google Cloud Storage Bucket to S3 Bucket

我已经设置了一个 airflow 工作流程,将一些文件从 s3 提取到 Google 云存储,然后运行 ​​sql 查询的工作流程以在 Big Query 上创建新的 tables .在工作流结束时,我需要将最后一个大查询 table 的输出推送到 Google Cloud Storage,然后从那里推送到 S3。

我使用 BigQueryToCloudStorageOperator python 运算符破解了 Big Query table 到 Google Cloud Storage 的传输,没有任何问题。然而,从 Google Cloud Storage 到 S3 的传输似乎是一条不太受欢迎的路线,我一直无法找到可以在我的 Airflow 工作流程中实现自动化的解决方案。

我知道 rsync 作为 gsutil 的一部分并且已经开始工作(参见 post )但我无法添加它进入我的工作流程。

我在计算引擎实例上有一个 dockerised 气流容器 运行。

非常感谢帮助解决这个问题。

非常感谢!

所以我们也使用 rsync 在 S3 和 GCS 之间移动数据,

您首先需要让 bash 脚本正常工作,例如 gsutil -m rsync -d -r gs://bucket/key s3://bucket/key

对于 s3,您还需要提供 AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY 作为环境变量。

然后定义您的 BashOperator 并将其放入您的 DAG 文件

rsync_yesterday = BashOperator(task_id='rsync_task_' + table,
                                bash_command='Your rsync script',
                                dag=dag)

Google 建议使用它的 transfer service 进行云平台之间的传输。您可以使用他们的 python API 以编程方式设置传输。这样数据就直接在 S3 和 google 云存储之间传输。使用 gsutilrsync 的缺点是数据必须通过执行 rsync 命令的 machine/instance。这可能是一个瓶颈。

Google Cloud Storage Transfer Service Doc

我需要使用 AWS Lambda 将对象从 GC 存储桶复制到 S3。

Python boto3 库 允许列出和下载 来自 GC 存储桶的对象。

以下是将 "sample-data-s3.csv" 对象从 GC 存储桶复制到 s3 存储桶的示例 lambda 代码。

import boto3
import io

s3 = boto3.resource('s3')

google_access_key_id="GOOG1EIxxMYKEYxxMQ"
google_access_key_secret="QifDxxMYSECRETKEYxxVU1oad1b"

gc_bucket_name="my_gc_bucket"


def get_gcs_objects(google_access_key_id, google_access_key_secret,
                     gc_bucket_name):
    """Gets GCS objects using boto3 SDK"""
    client = boto3.client("s3", region_name="auto",
                          endpoint_url="https://storage.googleapis.com",
                          aws_access_key_id=google_access_key_id,
                          aws_secret_access_key=google_access_key_secret)

    # Call GCS to list objects in gc_bucket_name
    response = client.list_objects(Bucket=gc_bucket_name)

    # Print object names
    print("Objects:")
    for blob in response["Contents"]:
        print(blob)    

    object = s3.Object('my_aws_s3_bucket', 'sample-data-s3.csv')
    f = io.BytesIO()
    client.download_fileobj(gc_bucket_name,"sample-data.csv",f)
    object.put(Body=f.getvalue())

def lambda_handler(event, context):
    get_gcs_objects(google_access_key_id,google_access_key_secret,gc_bucket_name) 

您可以循环 blob 从 GC 桶中下载所有对象。

希望这对想要使用 AWS lambda 将对象从 GC 存储桶传输到 s3 存储桶的人有所帮助。

最简单的整体选项是 gsutil rsync,但在某些情况下,rsync 可能会占用太多资源或速度不够快。

几个其他选择: