Airflow 操作员将许多文件(目录、前缀)从 Google Cloud Storage 存储桶复制到本地文件系统

Airflow operator to copy many files (directory, prefix) from Google Cloud Storage bucket to local filesystem

有一个 Airflow 操作员 GCSToLocalFilesystemOperator 可以将一个文件从 GCS 存储桶复制到本地文件系统。但它只支持一个文件,并且不能为给定的前缀复制多个文件。

有一个反向运算符 LocalFilesystemToGCSOperator 允许将许多文件从本地文件系统复制到存储桶,只需在路径“/*”中加上星号即可。

您知道在 Airflow 中通过前缀将文件从存储桶复制到本地文件系统的最佳方法是什么吗?我是不是遗漏了什么,或者它不是因为某种原因才实现的?

到目前为止,我提出的解决方案是在将文件放入存储桶之前对其进行压缩,使用 airflow 作为一个文件下载并在本地使用 BashOperator 解压缩。请问有没有更好的办法

我能够使用以下方法成功地将多个文件从 GCS 存储桶复制到本地文件系统(映射)以获取 Airflow 中给定的前缀。

import datetime

from airflow import models
from airflow.operators import bash
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.operators import python


YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
BUCKET_NAME = 'qpalzm-bucket'
GCS_FILES = ['luffy.jpg', 'zoro.jpg']
LOCAL_PATH = '/home/airflow/gcs/data'
PREFIX = 'testfolder'

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}
#
with models.DAG(
        'multi_copy_gcs_to_local',
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    def multi_copy(**kwargs):
        hook = GCSHook()

        for gcs_file in GCS_FILES:
            #initialize file name and the local directory where it will be copied
            filename = f'{LOCAL_PATH}/{gcs_file}'
            
            #check if PREFIX is available and initialize the gcs file to be copied
            if PREFIX:
                object_name = f'{PREFIX}/{gcs_file}'
            
            else:
                object_name = f'{gcs_file}'

            #perform gcs hook download
            hook.download(
                bucket_name = BUCKET_NAME,
                object_name = object_name,
                filename = filename
            )

    #execute multi_copy method
    multi_copy_op = python.PythonOperator(
            task_id='multi_gcs_to_local',
            provide_context=True,
            python_callable=multi_copy,
            )

    multi_copy_op

输出: