存档文件的气流 DAG 任务创建递归文件夹

Airflow DAG Task to Archive files creates recursive folders

我有以下 Google GCS 到 GCS 文件的移动。在 Google GCS(存储)中,我的存储桶中有一个文件夹(我们称之为“bucket1”)。在 bucket1 中,有一个存档文件夹和一系列 json 个文件。

我正在尝试将 json 文件放入存档文件夹。我的问题是它也在制作存档文件夹的副本(加上里面的内容)。这是创建一个递归存档文件夹(即 Archive\Archive\Archive...)。这是任务:

archive_files = GoogleCloudStorageToGoogleCloudStorageOperator(
    task_id='Archive_Files',
    source_bucket='my_data',
    source_object='*.json',
    destination_bucket='my_data',
    destination_object='Archive/',
    move_object=True,
    google_cloud_storage_conn_id='connection1',
    dag=dag
)

如何告诉 Airflow,我只想要该文件夹中的文件,而不创建递归“存档”文件夹?

谢谢!

使用 bash 运算符创建任务,任务应如下所示:

t1 = BashOperator(
        task_id='t1',
        bash_command = f'gsutil mv gs://gcs_location/*.csv gs://gcs_location/archive/'
    )