如何在 Airflow 中的单个 运行 中 运行 相同的 dag 两次

How to run same dag two times in a single run in Airflow

我对 Airflow 完全陌生。我有一个要求,我必须 运行 两个 EMR 作业。 .目前我有一个 python 脚本,它依赖于一些输入文件,如果存在它会触发 EMR 作业。

我的新要求是,我将不得不输入不同的文件(相同类型),这两个文件将被输入到 emr 作业中,在这两种情况下,spark 将做同样的事情,但只有输入文件不同。

create_job_workflow = EmrCreateJobFlowOperator(
    task_id='some-task',
    job_flow_overrides=job_flow_args,
    aws_conn_id=aws_conn,
    emr_conn_id=emr_conn,
    dag=dag
)

我可以通过仅更改 spark-submit 中的输入文件来实现 运行 两个相同的 dag 运行 吗? 'trigger DAG' 它将采用两个不同的输入文件并在两个不同的 emr 集群中触发两个不同的 emr 作业。或者你能给我一些最好的做法吗?或者通过改变 max_active_runs=2

怎么可能

最佳做法是为其设置两个不同的任务。通过设置 max_active_runs=2,您只需将并发数 dag_runs 限制为 2。您可以借助任何数据结构来为您的任务设置配置,迭代它并基于每个属性。

您可以做的另一件事:

您可以接收文件名作为您的 dag 的有效负载 访问方式如下:context['dag_run'].conf.get('filename')

并使用触发器 dag_run 运算符重新触发相同的 dag,用另一个文件更新所需的有效载荷