如何 运行 使用条件任务对 DAG 进行气流处理
How to run airflow DAG with conditional tasks
总共有 6 个任务是 there.These 任务需要根据一个字段的(flag_value)输入值来执行 json。
如果 flag_value 的值为真,那么所有任务都需要以这样的方式执行,
首先任务 1 然后并行到(任务 2 和任务 3 一起),并行到任务 4,并行到任务 5。
一旦所有这些完成,然后是 task6。
由于我是气流和 DAG 的新手,我不知道如何 运行 这种情况。
如果 flag_value 的值为 false,则顺序仅按顺序排列
task_1 >> task_4 >> task_5 >> task_6.
下面是我的 DAG 代码。
from airflow import DAG
from datetime import datetime
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False
}
dag = DAG('DAG_FOR_TEST',default_args=default_args,schedule_interval=None,max_active_runs=3, start_date=datetime(2020, 7, 8))
#################### CREATE TASK #####################################
task_1 = DatabricksSubmitRunOperator(
task_id='task_1',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_1/task_1.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_1.driver.TestClass1',
'parameters' : [
'{{ dag_run.conf.json }}'
]
}
)
task_2 = DatabricksSubmitRunOperator(
task_id='task_2',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_2/task_2.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_2.driver.TestClass2',
'parameters' : [
'{{ dag_run.conf.json }}'
]
}
)
task_3 = DatabricksSubmitRunOperator(
task_id='task_3',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_3/task_3.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_3.driver.TestClass3',
'parameters' : [
'{{ dag_run.conf.json }}'
]
}
)
task_4 = DatabricksSubmitRunOperator(
task_id='task_4',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_4/task_4.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_4.driver.TestClass4',
'parameters' : [
'{{ dag_run.conf.json }}'
]
}
)
task_5 = DatabricksSubmitRunOperator(
task_id='task_5',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_5/task_5.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_5.driver.TestClass5',
'parameters' : [
'json ={{ dag_run.conf.json }}'
]
}
)
task_6 = DatabricksSubmitRunOperator(
task_id='task_6',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_6/task_6.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_6.driver.TestClass6',
'parameters' : ['{{ dag_run.conf.json }}'
]
}
)
flag_value='{{ dag_run.conf.json.flag_value }}'
#################### ORDER OF OPERATORS ###########################
if flag_value == 'true':
task_1.dag = dag
task_2.dag = dag
task_3.dag = dag
task_4.dag = dag
task_5.dag = dag
task_6.dag = dag
task_1 >> [task_2 , task_3] >> [task_4] >> [task_5] >> task_6 // Not sure correct
else:
task_1.dag = dag
task_4.dag = dag
task_5.dag = dag
task_6.dag = dag
task_1 >> task_4 >> task_5 >> task_6
首先,依赖关系不正确,这应该可行:
task_1 >> [task_2 , task_3] >> task_4 >> task_5 >> task_6
无法使用 list_1 >> list_2
对任务进行排序,但有辅助方法可以提供此功能,请参阅:cross_downstream。
对于分支,您可以使用 BranchPythonOperator
来更改 trigger rules 任务。不确定以下代码,它可能有小错误,但这里的想法是可行的。
task_4.trigger_rule = "none_failed"
dummy = DummyOperator(task_id="dummy", dag=dag)
branch = BranchPythonOperator(
task_id="branch",
# jinja template returns string "True" or "False"
python_callable=lambda f: ["task_2" , "task_3"] if f == "True" else "dummy",
op_kwargs={"f": flag_value},
dag=dag)
task_1 >> branch
branch >> [task_2 , task_3, dummy] >> task_4
task_4 >> task_5 >> task_6
可能有更好的方法。
总共有 6 个任务是 there.These 任务需要根据一个字段的(flag_value)输入值来执行 json。 如果 flag_value 的值为真,那么所有任务都需要以这样的方式执行, 首先任务 1 然后并行到(任务 2 和任务 3 一起),并行到任务 4,并行到任务 5。 一旦所有这些完成,然后是 task6。 由于我是气流和 DAG 的新手,我不知道如何 运行 这种情况。
如果 flag_value 的值为 false,则顺序仅按顺序排列
task_1 >> task_4 >> task_5 >> task_6.
下面是我的 DAG 代码。
from airflow import DAG
from datetime import datetime
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False
}
dag = DAG('DAG_FOR_TEST',default_args=default_args,schedule_interval=None,max_active_runs=3, start_date=datetime(2020, 7, 8))
#################### CREATE TASK #####################################
task_1 = DatabricksSubmitRunOperator(
task_id='task_1',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_1/task_1.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_1.driver.TestClass1',
'parameters' : [
'{{ dag_run.conf.json }}'
]
}
)
task_2 = DatabricksSubmitRunOperator(
task_id='task_2',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_2/task_2.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_2.driver.TestClass2',
'parameters' : [
'{{ dag_run.conf.json }}'
]
}
)
task_3 = DatabricksSubmitRunOperator(
task_id='task_3',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_3/task_3.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_3.driver.TestClass3',
'parameters' : [
'{{ dag_run.conf.json }}'
]
}
)
task_4 = DatabricksSubmitRunOperator(
task_id='task_4',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_4/task_4.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_4.driver.TestClass4',
'parameters' : [
'{{ dag_run.conf.json }}'
]
}
)
task_5 = DatabricksSubmitRunOperator(
task_id='task_5',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_5/task_5.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_5.driver.TestClass5',
'parameters' : [
'json ={{ dag_run.conf.json }}'
]
}
)
task_6 = DatabricksSubmitRunOperator(
task_id='task_6',
databricks_conn_id='connection_id_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': 'dbfs:/task_6/task_6.jar'
}
],
spark_jar_task={
'main_class_name': 'com.task_6.driver.TestClass6',
'parameters' : ['{{ dag_run.conf.json }}'
]
}
)
flag_value='{{ dag_run.conf.json.flag_value }}'
#################### ORDER OF OPERATORS ###########################
if flag_value == 'true':
task_1.dag = dag
task_2.dag = dag
task_3.dag = dag
task_4.dag = dag
task_5.dag = dag
task_6.dag = dag
task_1 >> [task_2 , task_3] >> [task_4] >> [task_5] >> task_6 // Not sure correct
else:
task_1.dag = dag
task_4.dag = dag
task_5.dag = dag
task_6.dag = dag
task_1 >> task_4 >> task_5 >> task_6
首先,依赖关系不正确,这应该可行:
task_1 >> [task_2 , task_3] >> task_4 >> task_5 >> task_6
无法使用 list_1 >> list_2
对任务进行排序,但有辅助方法可以提供此功能,请参阅:cross_downstream。
对于分支,您可以使用 BranchPythonOperator
来更改 trigger rules 任务。不确定以下代码,它可能有小错误,但这里的想法是可行的。
task_4.trigger_rule = "none_failed"
dummy = DummyOperator(task_id="dummy", dag=dag)
branch = BranchPythonOperator(
task_id="branch",
# jinja template returns string "True" or "False"
python_callable=lambda f: ["task_2" , "task_3"] if f == "True" else "dummy",
op_kwargs={"f": flag_value},
dag=dag)
task_1 >> branch
branch >> [task_2 , task_3, dummy] >> task_4
task_4 >> task_5 >> task_6
可能有更好的方法。