Apache Airflow - 如何在使用 TriggerDagRunOperator 触发的流中检索运算符外部的 dag_run 数据
Apache Airflow - How to retrieve dag_run data outside an operator in a flow triggered with TriggerDagRunOperator
我设置了两个 DAG,我们称第一个为 orchestrator,第二个为 worker。 Orchestrator 的工作是从 API 中检索一个列表,并针对此列表中的每个元素,使用一些参数触发 worker DAG。
我将这两个工作流分开的原因是我希望能够仅重放失败的 "worker" 个工作流(如果一个失败,我不想重放所有工作实例)。
我能够让事情正常进行,但现在我看到监控有多么困难,因为我的 task_id 对所有人来说都是一样的,所以我决定使用动态 task_id 基于"orchestrator" 工作流程从 API 检索到的值。
但是,我无法从运算符外部的 dag_run 对象中检索值。基本上,我希望它能工作:
with models.DAG('specific_workflow', schedule_interval=None, default_args=default_dag_args) as dag:
name = context['dag_run'].name
hello_world = BashOperator(task_id='hello_{}'.format(name), bash_command="echo Hello {{ dag_run.conf.name }}", dag=dag)
bye = BashOperator(task_id='bye_{}'.format(name), bash_command="echo Goodbye {{ dag_run.conf.name }}", dag=dag)
hello_world >> bye
但是我无法定义这个 "context" 对象。但是,我可以从操作员(例如 PythonOperator 和 BashOperator)访问它。
是否可以在运算符之外检索 dag_run 对象?
我认为目前不太可能。例如,作为 worker 运行 进程的一部分,检索 DAG 时没有提供任何 TaskInstance 上下文,除了在哪里可以找到 DAG:https://github.com/apache/incubator-airflow/blob/f18e2550543e455c9701af0995bc393ee6a97b47/airflow/bin/cli.py#L353
DAG 的 run_id
将是存储此信息的好地方。
是的,这是可能的
我尝试并为我工作的是
In the following code block, I am trying to show all possible ways to use the configurations passed,
directly to different operators
pyspark_task = DataprocSubmitJobOperator(
task_id="task_0001",
job=PYSPARK_JOB,
location=f"{{{{dag_run.conf.get('dataproc_region','{config_data['cluster_location']}')}}}}",
project_id="{{dag_run.conf['dataproc_project_id']}}",
gcp_conn_id="{{dag_run.conf.gcp_conn_id}}"
)
所以你可以像这样使用它
"{{dag_run.conf.field_name}}" or "{{dag_run.conf['field_name']}}"
或者
如果你想在配置字段是可选的情况下使用一些默认值,
f"{{{{dag_run.conf.get('field_name', '{local_variable['field_name_0002']}')}}}}"
我设置了两个 DAG,我们称第一个为 orchestrator,第二个为 worker。 Orchestrator 的工作是从 API 中检索一个列表,并针对此列表中的每个元素,使用一些参数触发 worker DAG。
我将这两个工作流分开的原因是我希望能够仅重放失败的 "worker" 个工作流(如果一个失败,我不想重放所有工作实例)。
我能够让事情正常进行,但现在我看到监控有多么困难,因为我的 task_id 对所有人来说都是一样的,所以我决定使用动态 task_id 基于"orchestrator" 工作流程从 API 检索到的值。
但是,我无法从运算符外部的 dag_run 对象中检索值。基本上,我希望它能工作:
with models.DAG('specific_workflow', schedule_interval=None, default_args=default_dag_args) as dag:
name = context['dag_run'].name
hello_world = BashOperator(task_id='hello_{}'.format(name), bash_command="echo Hello {{ dag_run.conf.name }}", dag=dag)
bye = BashOperator(task_id='bye_{}'.format(name), bash_command="echo Goodbye {{ dag_run.conf.name }}", dag=dag)
hello_world >> bye
但是我无法定义这个 "context" 对象。但是,我可以从操作员(例如 PythonOperator 和 BashOperator)访问它。
是否可以在运算符之外检索 dag_run 对象?
我认为目前不太可能。例如,作为 worker 运行 进程的一部分,检索 DAG 时没有提供任何 TaskInstance 上下文,除了在哪里可以找到 DAG:https://github.com/apache/incubator-airflow/blob/f18e2550543e455c9701af0995bc393ee6a97b47/airflow/bin/cli.py#L353
DAG 的 run_id
将是存储此信息的好地方。
是的,这是可能的 我尝试并为我工作的是
In the following code block, I am trying to show all possible ways to use the configurations passed, directly to different operators
pyspark_task = DataprocSubmitJobOperator(
task_id="task_0001",
job=PYSPARK_JOB,
location=f"{{{{dag_run.conf.get('dataproc_region','{config_data['cluster_location']}')}}}}",
project_id="{{dag_run.conf['dataproc_project_id']}}",
gcp_conn_id="{{dag_run.conf.gcp_conn_id}}"
)
所以你可以像这样使用它
"{{dag_run.conf.field_name}}" or "{{dag_run.conf['field_name']}}"
或者 如果你想在配置字段是可选的情况下使用一些默认值,
f"{{{{dag_run.conf.get('field_name', '{local_variable['field_name_0002']}')}}}}"