存档文件的气流 DAG 任务创建递归文件夹
Airflow DAG Task to Archive files creates recursive folders
我有以下 Google GCS 到 GCS 文件的移动。在 Google GCS(存储)中,我的存储桶中有一个文件夹(我们称之为“bucket1”)。在 bucket1 中,有一个存档文件夹和一系列 json 个文件。
我正在尝试将 json 文件放入存档文件夹。我的问题是它也在制作存档文件夹的副本(加上里面的内容)。这是创建一个递归存档文件夹(即 Archive\Archive\Archive...)。这是任务:
archive_files = GoogleCloudStorageToGoogleCloudStorageOperator(
task_id='Archive_Files',
source_bucket='my_data',
source_object='*.json',
destination_bucket='my_data',
destination_object='Archive/',
move_object=True,
google_cloud_storage_conn_id='connection1',
dag=dag
)
如何告诉 Airflow,我只想要该文件夹中的文件,而不创建递归“存档”文件夹?
谢谢!
使用 bash 运算符创建任务,任务应如下所示:
t1 = BashOperator(
task_id='t1',
bash_command = f'gsutil mv gs://gcs_location/*.csv gs://gcs_location/archive/'
)
我有以下 Google GCS 到 GCS 文件的移动。在 Google GCS(存储)中,我的存储桶中有一个文件夹(我们称之为“bucket1”)。在 bucket1 中,有一个存档文件夹和一系列 json 个文件。
我正在尝试将 json 文件放入存档文件夹。我的问题是它也在制作存档文件夹的副本(加上里面的内容)。这是创建一个递归存档文件夹(即 Archive\Archive\Archive...)。这是任务:
archive_files = GoogleCloudStorageToGoogleCloudStorageOperator(
task_id='Archive_Files',
source_bucket='my_data',
source_object='*.json',
destination_bucket='my_data',
destination_object='Archive/',
move_object=True,
google_cloud_storage_conn_id='connection1',
dag=dag
)
如何告诉 Airflow,我只想要该文件夹中的文件,而不创建递归“存档”文件夹?
谢谢!
使用 bash 运算符创建任务,任务应如下所示:
t1 = BashOperator(
task_id='t1',
bash_command = f'gsutil mv gs://gcs_location/*.csv gs://gcs_location/archive/'
)