Apache Airflow - 如何在使用 TriggerDagRunOperator 触发的流中检索运算符外部的 dag_run 数据

Apache Airflow - How to retrieve dag_run data outside an operator in a flow triggered with TriggerDagRunOperator

我设置了两个 DAG,我们称第一个为 orchestrator,第二个为 worker。 Orchestrator 的工作是从 API 中检索一个列表,并针对此列表中的每个元素,使用一些参数触发 worker DAG。

我将这两个工作流分开的原因是我希望能够仅重放失败的 "worker" 个工作流(如果一个失败,我不想重放所有工作实例)。

我能够让事情正常进行,但现在我看到监控有多么困难,因为我的 task_id 对所有人来说都是一样的,所以我决定使用动态 task_id 基于"orchestrator" 工作流程从 API 检索到的值。

但是,我无法从运算符外部的 dag_run 对象中检索值。基本上,我希望它能工作:

with models.DAG('specific_workflow', schedule_interval=None, default_args=default_dag_args) as dag:
    name = context['dag_run'].name
    hello_world = BashOperator(task_id='hello_{}'.format(name), bash_command="echo Hello {{ dag_run.conf.name }}", dag=dag)
    bye = BashOperator(task_id='bye_{}'.format(name), bash_command="echo Goodbye {{ dag_run.conf.name }}", dag=dag)

    hello_world >> bye

但是我无法定义这个 "context" 对象。但是,我可以从操作员(例如 PythonOperator 和 BashOperator)访问它。

是否可以在运算符之外检索 dag_run 对象?

我认为目前不太可能。例如,作为 worker 运行 进程的一部分,检索 DAG 时没有提供任何 TaskInstance 上下文,除了在哪里可以找到 DAG:https://github.com/apache/incubator-airflow/blob/f18e2550543e455c9701af0995bc393ee6a97b47/airflow/bin/cli.py#L353

上下文稍后注入:https://github.com/apache/incubator-airflow/blob/c5f1c6a31b20bbb80b4020b753e88cc283aaf197/airflow/models.py#L1479

DAG 的 run_id 将是存储此信息的好地方。

是的,这是可能的 我尝试并为我工作的是

In the following code block, I am trying to show all possible ways to use the configurations passed, directly to different operators

pyspark_task = DataprocSubmitJobOperator(
    task_id="task_0001",
    job=PYSPARK_JOB,
    location=f"{{{{dag_run.conf.get('dataproc_region','{config_data['cluster_location']}')}}}}",
    project_id="{{dag_run.conf['dataproc_project_id']}}",
    gcp_conn_id="{{dag_run.conf.gcp_conn_id}}"
)

所以你可以像这样使用它

"{{dag_run.conf.field_name}}" or "{{dag_run.conf['field_name']}}"

或者 如果你想在配置字段是可选的情况下使用一些默认值,

f"{{{{dag_run.conf.get('field_name', '{local_variable['field_name_0002']}')}}}}"