Airflow - 在 2 个不同的项目中复制 blob from/to Google Cloud Storage

Airflow - Copy blob from/to Google Cloud Storage within 2 different projects

我正在尝试使用 Airflow 将项目 X 中的 GCS 存储桶 A 中的 blob 复制到项目 Y 中的存储桶 B。

可用运算符 (GCSToGCSOperator) 似乎只在同一项目中的两个存储桶之间运行良好。

在我的案例中如何实现复制?

我想避免使用 BashOperator...

谢谢!!

选项 1: 使用 CloudDataTransferServiceCreateJobOperator 使用 Google API. You can find information about it in docs. Note that this require the service account to have access to both. If this is not the case then it's not supported yet See

创建传输作业

选项 2: 对项目 1 使用 GCSToLocalFilesystemOperator,然后对项目 2 使用 LocalFilesystemToGCSOperator

此解决方案的框架:

from airflow import DAG
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator

with DAG(
    "example", schedule_interval="@daily", start_date=datetime(2021, 1, 1), catchup=False
) as dag:
    download = GCSToLocalFilesystemOperator(
        task_id="download_task",
        bucket='some_bucket',
        filename='/tmp/fake1.csv',
        object_name="test/test1.csv",
        gcp_conn_id='google_cloud_origin'
    )
    
    upload = LocalFilesystemToGCSOperator(
        task_id='upload_task',
        bucket='some_bucket',
        src='/tmp/fake1.csv',
        dst='test/test1.csv',
        gcp_conn_id='google_cloud_dest'
    )

    download >> upload

虽然这不是理想的解决方案。这实际上取决于您的工作量和频率。使用此解决方案,您可以通过本地磁盘传输文件 - 小批量就可以了。此解决方案适用于两个不同帐户的情况,因为每个操作员都关联到不同的 Google 连接。