我们如何在气流中使用延迟 DAG 分配

How can we use Deferred DAG assignment in airflow

我是 Apache airflow 的新手,正在使用 DAG。 y代码如下。

在输入 json 中,我有一个名为 'sports_category' 的参数。如果它的值为 'football' 那么 football_players 任务需要 运行 如果它的值为 cricket 那么 'cricket_players' 任务 运行s.

import airflow

from airflow import DAG
from airflow.contrib.operators.databricks_operator import DatabricksSubmitRunOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 6, 23)
}

dag = DAG('PLAYERS_DETAILS',default_args=default_args,schedule_interval=None,max_active_runs=5) 

football_players = DatabricksSubmitRunOperator(
    task_id='football_players',
    databricks_conn_id='football_players_details',
    existing_cluster_id='{{ dag_run.conf.clusterId }}',
    libraries= [
        {
        'jar': {{ jar path }}
        }        
        ],
        databricks_retry_limit = 3,
    spark_jar_task={
        'main_class_name': 'football class name1',
        'parameters' : [
            'json ={{ dag_run.conf.json }}'     
        ]
    }
)

cricket_players = DatabricksSubmitRunOperator(
    task_id='cricket_players',
    databricks_conn_id='cricket_players_details',
    existing_cluster_id='{{ dag_run.conf.clusterId }}',
    libraries= [
        {
        'jar': {{ jar path }}
        }        
        ],
        databricks_retry_limit = 3,
    spark_jar_task={
        'main_class_name': 'cricket class name2',
        'parameters' : [
            'json ={{ dag_run.conf.json }}'     
        ]
    }
)

我建议使用 BranchPythonOperator,它将一个函数作为参数,return 根据函数内部编写的逻辑,运行 流程中的下一个任务需要运行。

参考 here for documentation and here 示例 dag。

让我知道你的回复!