我们如何使用 TriggerDagRunOperator 触发多个气流 dag?

How do we trigger multiple airflow dags using TriggerDagRunOperator?

我有一个场景,其中一个特定的 dag 在完成时需要触发多个 dag,已经使用 TriggerDagRunOperator 触发单个 dag,是否可以将多个 dag 传递给 TriggerDagRunOperator 以触发多个 dag?

是否可以只在当前dag成功完成后触发

作为 API docs 状态,该方法接受单个 dag_id。但是,如果您想在完成后无条件地启动下游 DAG,为什么不将这些任务放在一个 DAG 中并在那里设置您的 dependencies/workflow 呢?然后您可以在适当的地方设置 depends_on_past=True

编辑:如果您绝对需要在单独的 DAG 中使用它们,那么简单的解决方法是创建多个 TriggerDagRunOperators 并将它们的依赖项设置为同一任务。

你可以试试循环播放!例如:

for i in list:

trigger_dag =TriggerDagRunOperator(task_id='trigger_'+ i, 
                                trigger_dag_id=i,
                                python_callable=conditionally_trigger_non_indr,
                                dag=dag)

根据所需的任务进行设置。我已经为 PythonOperator 自动化了类似这样的东西。如果这对你有用,你可以试试!

我遇到了同样的问题。并且没有开箱即用的解决方案,但我们可以为其编写自定义运算符。

这里是自定义运算符的代码,它获取 python_callabletrigger_dag_id 作为参数:

class TriggerMultiDagRunOperator(TriggerDagRunOperator):

    @apply_defaults
    def __init__(self, op_args=None, op_kwargs=None, *args, **kwargs):
        super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
        self.op_args = op_args or []
        self.op_kwargs = op_kwargs or {}

    def execute(self, context):
        session = settings.Session()
        created = False
        for dro in self.python_callable(context, *self.op_args, **self.op_kwargs):
            if not dro or not isinstance(dro, DagRunOrder):
                break

            if dro.run_id is None:
                dro.run_id = 'trig__' + datetime.utcnow().isoformat()

            dbag = DagBag(settings.DAGS_FOLDER)
            trigger_dag = dbag.get_dag(self.trigger_dag_id)
            dr = trigger_dag.create_dagrun(
                run_id=dro.run_id,
                state=State.RUNNING,
                conf=dro.payload,
                external_trigger=True
            )
            created = True
            self.log.info("Creating DagRun %s", dr)

        if created is True:
            session.commit()
        else:
            self.log.info("No DagRun created")
        session.close()

trigger_dag_id 是我们多次 运行 想要的 dag id。

python_callable 是一个函数,它应该 return 一个 DagRunOrder 对象的列表,一个对象用于调度一个具有 dag_id trigger_dag_id 的 DAG 实例.

GitHub 上的代码和示例:https://github.com/mastak/airflow_multi_dagrun 关于这段代码的更多描述:https://medium.com/@igorlubimov/dynamic-scheduling-in-airflow-52979b3e6b13