Airflow 操作员将许多文件(目录、前缀)从 Google Cloud Storage 存储桶复制到本地文件系统
Airflow operator to copy many files (directory, prefix) from Google Cloud Storage bucket to local filesystem
有一个 Airflow 操作员 GCSToLocalFilesystemOperator
可以将一个文件从 GCS 存储桶复制到本地文件系统。但它只支持一个文件,并且不能为给定的前缀复制多个文件。
有一个反向运算符 LocalFilesystemToGCSOperator
允许将许多文件从本地文件系统复制到存储桶,只需在路径“/*”中加上星号即可。
您知道在 Airflow 中通过前缀将文件从存储桶复制到本地文件系统的最佳方法是什么吗?我是不是遗漏了什么,或者它不是因为某种原因才实现的?
到目前为止,我提出的解决方案是在将文件放入存储桶之前对其进行压缩,使用 airflow 作为一个文件下载并在本地使用 BashOperator
解压缩。请问有没有更好的办法
我能够使用以下方法成功地将多个文件从 GCS 存储桶复制到本地文件系统(映射)以获取 Airflow 中给定的前缀。
import datetime
from airflow import models
from airflow.operators import bash
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.operators import python
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
BUCKET_NAME = 'qpalzm-bucket'
GCS_FILES = ['luffy.jpg', 'zoro.jpg']
LOCAL_PATH = '/home/airflow/gcs/data'
PREFIX = 'testfolder'
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
#
with models.DAG(
'multi_copy_gcs_to_local',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
def multi_copy(**kwargs):
hook = GCSHook()
for gcs_file in GCS_FILES:
#initialize file name and the local directory where it will be copied
filename = f'{LOCAL_PATH}/{gcs_file}'
#check if PREFIX is available and initialize the gcs file to be copied
if PREFIX:
object_name = f'{PREFIX}/{gcs_file}'
else:
object_name = f'{gcs_file}'
#perform gcs hook download
hook.download(
bucket_name = BUCKET_NAME,
object_name = object_name,
filename = filename
)
#execute multi_copy method
multi_copy_op = python.PythonOperator(
task_id='multi_gcs_to_local',
provide_context=True,
python_callable=multi_copy,
)
multi_copy_op
输出:
有一个 Airflow 操作员 GCSToLocalFilesystemOperator
可以将一个文件从 GCS 存储桶复制到本地文件系统。但它只支持一个文件,并且不能为给定的前缀复制多个文件。
有一个反向运算符 LocalFilesystemToGCSOperator
允许将许多文件从本地文件系统复制到存储桶,只需在路径“/*”中加上星号即可。
您知道在 Airflow 中通过前缀将文件从存储桶复制到本地文件系统的最佳方法是什么吗?我是不是遗漏了什么,或者它不是因为某种原因才实现的?
到目前为止,我提出的解决方案是在将文件放入存储桶之前对其进行压缩,使用 airflow 作为一个文件下载并在本地使用 BashOperator
解压缩。请问有没有更好的办法
我能够使用以下方法成功地将多个文件从 GCS 存储桶复制到本地文件系统(映射)以获取 Airflow 中给定的前缀。
import datetime
from airflow import models
from airflow.operators import bash
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.operators import python
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
BUCKET_NAME = 'qpalzm-bucket'
GCS_FILES = ['luffy.jpg', 'zoro.jpg']
LOCAL_PATH = '/home/airflow/gcs/data'
PREFIX = 'testfolder'
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
#
with models.DAG(
'multi_copy_gcs_to_local',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
def multi_copy(**kwargs):
hook = GCSHook()
for gcs_file in GCS_FILES:
#initialize file name and the local directory where it will be copied
filename = f'{LOCAL_PATH}/{gcs_file}'
#check if PREFIX is available and initialize the gcs file to be copied
if PREFIX:
object_name = f'{PREFIX}/{gcs_file}'
else:
object_name = f'{gcs_file}'
#perform gcs hook download
hook.download(
bucket_name = BUCKET_NAME,
object_name = object_name,
filename = filename
)
#execute multi_copy method
multi_copy_op = python.PythonOperator(
task_id='multi_gcs_to_local',
provide_context=True,
python_callable=multi_copy,
)
multi_copy_op
输出: