我们如何在气流中使用延迟 DAG 分配
How can we use Deferred DAG assignment in airflow
我是 Apache airflow 的新手,正在使用 DAG。 y代码如下。
在输入 json 中,我有一个名为 'sports_category' 的参数。如果它的值为 'football' 那么 football_players 任务需要 运行 如果它的值为 cricket 那么 'cricket_players' 任务 运行s.
import airflow
from airflow import DAG
from airflow.contrib.operators.databricks_operator import DatabricksSubmitRunOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 6, 23)
}
dag = DAG('PLAYERS_DETAILS',default_args=default_args,schedule_interval=None,max_active_runs=5)
football_players = DatabricksSubmitRunOperator(
task_id='football_players',
databricks_conn_id='football_players_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': {{ jar path }}
}
],
databricks_retry_limit = 3,
spark_jar_task={
'main_class_name': 'football class name1',
'parameters' : [
'json ={{ dag_run.conf.json }}'
]
}
)
cricket_players = DatabricksSubmitRunOperator(
task_id='cricket_players',
databricks_conn_id='cricket_players_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': {{ jar path }}
}
],
databricks_retry_limit = 3,
spark_jar_task={
'main_class_name': 'cricket class name2',
'parameters' : [
'json ={{ dag_run.conf.json }}'
]
}
)
我是 Apache airflow 的新手,正在使用 DAG。 y代码如下。
在输入 json 中,我有一个名为 'sports_category' 的参数。如果它的值为 'football' 那么 football_players 任务需要 运行 如果它的值为 cricket 那么 'cricket_players' 任务 运行s.
import airflow
from airflow import DAG
from airflow.contrib.operators.databricks_operator import DatabricksSubmitRunOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 6, 23)
}
dag = DAG('PLAYERS_DETAILS',default_args=default_args,schedule_interval=None,max_active_runs=5)
football_players = DatabricksSubmitRunOperator(
task_id='football_players',
databricks_conn_id='football_players_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': {{ jar path }}
}
],
databricks_retry_limit = 3,
spark_jar_task={
'main_class_name': 'football class name1',
'parameters' : [
'json ={{ dag_run.conf.json }}'
]
}
)
cricket_players = DatabricksSubmitRunOperator(
task_id='cricket_players',
databricks_conn_id='cricket_players_details',
existing_cluster_id='{{ dag_run.conf.clusterId }}',
libraries= [
{
'jar': {{ jar path }}
}
],
databricks_retry_limit = 3,
spark_jar_task={
'main_class_name': 'cricket class name2',
'parameters' : [
'json ={{ dag_run.conf.json }}'
]
}
)