在 TriggerDagRunOperator 中提供上下文

Providing context in TriggerDagRunOperator

我有一个 dag 被另一个 dag 触发了。我已经通过 DagRunOrder().payload 字典向这个 dag 传递了一些配置变量,就像 official example 所做的一样。

现在在这个 dag 中,我有另一个 TriggerDagRunOperator 来启动第二个 dag,并希望通过这些相同的配置变量。

我已经成功访问​​了 PythonOperator 中的有效负载变量,如下所示:

def run_this_func(ds, **kwargs):
    print("Remotely received value of {} for message and {} for day".format(
        kwargs["dag_run"].conf["message"], kwargs["dag_run"].conf["day"])
    )

run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=run_this_func,
    dag=dag
)

但同样的模式在 TriggerDagRunOperator:

中不起作用
def trigger(context, dag_run_obj, **kwargs):
    dag_run_obj.payload = {
        "message": kwargs["dag_run"].conf["message"],
        "day": kwargs["dag_run"].conf["day"]
    }
    return dag_run_obj

trigger_step = TriggerDagRunOperator(
    task_id="trigger_modelling",
    trigger_dag_id="Dummy_Modelling",
    provide_context=True,
    python_callable=trigger,
    dag=dag
)

它会产生关于使用 provide_context:

的警告
INFO - Subtask: /usr/local/lib/python2.7/dist-packages/airflow/models.py:1927: PendingDeprecationWarning: Invalid arguments were passed to TriggerDagRunOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
INFO - Subtask: *args: ()
INFO - Subtask: **kwargs: {'provide_context': True}
INFO - Subtask:   category=PendingDeprecationWarning

这个错误表明我还没有通过 conf :

INFO - Subtask: Traceback (most recent call last):
INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1374, in run
INFO - Subtask:     result = task_copy.execute(context=context)
INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/airflow/operators/dagrun_operator.py", line 64, in execute
INFO - Subtask:     dro = self.python_callable(context, dro)
INFO - Subtask:   File "/home/user/airflow/dags/dummy_responses.py", line 28, in trigger
INFO - Subtask:     "message": kwargs["dag_run"].conf["message"],
INFO - Subtask: KeyError: 'dag_run'

我尝试过的第二种模式也没有用,它使用 params 参数,如下所示:

def trigger(context, dag_run_obj):
    dag_run_obj.payload = {
        "message": context['params']['message'],
        "day": context['params']['day']
    }
    return dag_run_obj

trigger_step = TriggerDagRunOperator(
    task_id="trigger_modelling",
    trigger_dag_id="Dummy_Modelling",
    python_callable=trigger,
    params={
        "message": "{{ dag_run.conf['message'] }}",
        "day": "{{ dag_run.conf['day'] }}"
    },
    dag=dag
)

此模式不会产生错误,而是将参数作为字符串传递给下一个 dag,即它不会计算表达式。


如何访问第二个 dag 的 TriggerDagRunOperator 中的配置变量?

已解决:

dag_run 对象存储在上下文中,因此可以使用以下模式在 TriggerDagRunOperatorpython_callable 中访问配置变量:

def trigger(context, dag_run_obj):
    dag_run_obj.payload = {
        "message": context["dag_run"].conf["message"],
        "day": context["dag_run"].conf["day"]
    }
    return dag_run_obj

trigger_step = TriggerDagRunOperator(
    task_id="trigger_modelling",
    trigger_dag_id="Dummy_Modelling",
    python_callable=trigger,
    dag=dag
)

@efbbrown 是的,要么你可以这样做,要么每当你访问第一个 dags 参数时将其推送到 xcom 并在触发第二个 dag 时将其拉出

Airflow2.0.x中,相当于@efbbrown的答案是:

from airflow.operators.trigger_dagrun import TriggerDagRunOperator

trigger_step = TriggerDagRunOperator(
    task_id="trigger_modelling",
    trigger_dag_id="Dummy_Modelling",
    conf={"message": "{{ dag_run.conf['message'] }}", "day":"{{ 
    dag_run.conf['day'] }}"},
    dag=dag
)

此处在 GitHub 上描述了拉取请求。

请参阅 external-triggers and for trigger_dagrun 的文档。

这里是关于主题的 YouTube video,显示了正确的导入。