运行 一个 Airflow Operator 的多次迭代

Run multiple iterations of one Airflow Operator

我正在构建一个系统,该系统应该列出远程 SFTP 服务器上的文件,然后将文件下载到本地。我希望它并行 运行 这样我就可以为每个要下载的文件启动一个作业,或者同时下载 10 个以上的文件。

我是 Airflow 的新手,还没有完全理解所有内容。我认为应该有一个解决方案来做到这一点,但我就是想不出来。

这是代码,目前我在一个 Operator 中下载所有文件,但据我所知,它没有使用多个 worker。

def transfer_files():
    for i in range(1, 11): 
        sftp.get(REMOTE_PATH + 'test_{}.csv'.format(i), LOCAL_PATH + 'test_{}.csv'.format(i))

假设您正在使用 PythonOperator,您可以启动多个 PythonOperator,它看起来像这样:

def get_my_file(i):
    sftp.get(REMOTE_PATH + 'test_{}.csv'.format(i), LOCAL_PATH + 'test_{}.csv'.format(i))

def transfer_files():
    for i in range(1, 11):
        task = PythonOperator(
            task_id='test_{}.csv'.format(i),
            python_callable=get_my_file,
            op_args=[i],
            dag=dag)