在气流 Dag 中使用 dag_run 个变量
Using dag_run variables in airflow Dag
我正在尝试使用气流变量来确定是否执行任务。我已经试过了,但没有用:
if '{{ params.year }}' == '{{ params.message }}':
run_this = DummyOperator (
task_id = 'dummy_dag'
)
我希望得到一些帮助让它发挥作用。在气流中还有更好的方法来做这样的事情吗?
我认为解决这个问题的一个好方法是 BranchPythonOperator
根据提供的 DAG 参数动态分支。考虑这个例子:
使用params
向DAG提供参数(也可以从UI完成),在这个例子中:{"enabled": True}
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.operators.python import get_current_context, BranchPythonOperator
@dag(
default_args=default_args,
schedule_interval=None,
start_date=days_ago(1),
catchup=False,
tags=["example"],
params={"enabled": True},
)
def branch_from_dag_params():
def _print_enabled():
context = get_current_context()
enabled = context["params"].get("enabled", False)
print(f"Task id: {context['ti'].task_id}")
print(f"Enabled is: {enabled}")
@task
def task_a():
_print_enabled()
@task
def task_b():
_print_enabled()
定义 BranchPythonOperator
的可调用对象,您将在其中执行条件,return 下一个要执行的任务。您可以从 **kwargs
访问执行上下文变量。还要记住,此运算符应该 return 单个 task_id 或 列表 task_ids跟随下游。这些结果任务应始终直接位于它的下游。
def _get_task_run(ti, **kwargs):
custom_param = kwargs["params"].get("enabled", False)
if custom_param:
return "task_a"
else:
return "task_b"
branch_task = BranchPythonOperator(
task_id="branch_task",
python_callable=_get_task_run,
)
task_a_exec = task_a()
task_b_exec = task_b()
branch_task >> [task_a_exec, task_b_exec]
结果是 task_a 被执行并且 task_b 被 跳过 :
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=branch_from_dag_params
AIRFLOW_CTX_TASK_ID=task_a
Task id: task_a
Enabled is: True
如果这对你有用,请告诉我。
我正在尝试使用气流变量来确定是否执行任务。我已经试过了,但没有用:
if '{{ params.year }}' == '{{ params.message }}':
run_this = DummyOperator (
task_id = 'dummy_dag'
)
我希望得到一些帮助让它发挥作用。在气流中还有更好的方法来做这样的事情吗?
我认为解决这个问题的一个好方法是 BranchPythonOperator
根据提供的 DAG 参数动态分支。考虑这个例子:
使用params
向DAG提供参数(也可以从UI完成),在这个例子中:{"enabled": True}
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.operators.python import get_current_context, BranchPythonOperator
@dag(
default_args=default_args,
schedule_interval=None,
start_date=days_ago(1),
catchup=False,
tags=["example"],
params={"enabled": True},
)
def branch_from_dag_params():
def _print_enabled():
context = get_current_context()
enabled = context["params"].get("enabled", False)
print(f"Task id: {context['ti'].task_id}")
print(f"Enabled is: {enabled}")
@task
def task_a():
_print_enabled()
@task
def task_b():
_print_enabled()
定义 BranchPythonOperator
的可调用对象,您将在其中执行条件,return 下一个要执行的任务。您可以从 **kwargs
访问执行上下文变量。还要记住,此运算符应该 return 单个 task_id 或 列表 task_ids跟随下游。这些结果任务应始终直接位于它的下游。
def _get_task_run(ti, **kwargs):
custom_param = kwargs["params"].get("enabled", False)
if custom_param:
return "task_a"
else:
return "task_b"
branch_task = BranchPythonOperator(
task_id="branch_task",
python_callable=_get_task_run,
)
task_a_exec = task_a()
task_b_exec = task_b()
branch_task >> [task_a_exec, task_b_exec]
结果是 task_a 被执行并且 task_b 被 跳过 :
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=branch_from_dag_params
AIRFLOW_CTX_TASK_ID=task_a
Task id: task_a
Enabled is: True
如果这对你有用,请告诉我。