更改 Airflow DAG 的执行并发
Change execution concurrency of Airflow DAG
我想更改特定 Airflow DAG 的 dag_concurrency
参数。 airflow.cfg
中似乎有一个全局参数 dag_concurrency
但是是否可以为不同的 DAG 设置不同的值?
我尝试在 SSHExecuteOperator
任务的 DAG 代码中添加并发参数,但并发值在 DAG 详细信息中仍然显示标准参数 (16)。
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.now(),
'email': ['exceptions@airflow.com'],
'email_on_failure': True,
'retries': 0
}
#server must be changed to point to the correct environment
sshHookEtl = SSHHook(conn_id='SSH__airflow@myserver')
with DAG(
'ed_data_quality_20min-v1.6.6',
default_args=default_args,
schedule_interval="0,20,40 * * * *",
dagrun_timeout=timedelta(hours=24)) as dag:
(
dag
>> SSHExecuteOperator(
task_id='run_remote_ed_data_quality_20min',
bash_command='bash /opt/scripts/shell/EXEC_ED_DATA_QUALITY_20MIN.sh ',
ssh_hook=sshHookEtl,
retries=0,
concurrency=1,
dag=dag)
)
Here is the DAG details
我找到了解决方案。我没有在正确的位置添加并发参数。它应该作为 DAG
对象的属性直接添加,而不是在任务 SSHExecuteOperator
中添加。这是新代码:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.now(),
'email': ['exceptions@airflow.com'],
'email_on_failure': True,
'retries': 0
}
#server must be changed to point to the correct environment
sshHookEtl = SSHHook(conn_id='SSH__airflow@myserver')
with DAG(
'ed_data_quality_20min-v1.6.6',
default_args=default_args,
schedule_interval="0,20,40 * * * *",
dagrun_timeout=timedelta(hours=24),
concurrency=1) as dag:
(
dag
>> SSHExecuteOperator(
task_id='run_remote_ed_data_quality_20min',
bash_command='bash /opt/scripts/shell/EXEC_ED_DATA_QUALITY_20MIN.sh ',
ssh_hook=sshHookEtl,
retries=0,
dag=dag)
)
好的…
您可以在 DAG
对象上 just set concurrency
。
BaseOperator
对象上还有一个 task_concurrency
。 SSHExectorOperator
或 BaseOperator
任务中没有 concurrency
参数或字段。
我想更改特定 Airflow DAG 的 dag_concurrency
参数。 airflow.cfg
中似乎有一个全局参数 dag_concurrency
但是是否可以为不同的 DAG 设置不同的值?
我尝试在 SSHExecuteOperator
任务的 DAG 代码中添加并发参数,但并发值在 DAG 详细信息中仍然显示标准参数 (16)。
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.now(),
'email': ['exceptions@airflow.com'],
'email_on_failure': True,
'retries': 0
}
#server must be changed to point to the correct environment
sshHookEtl = SSHHook(conn_id='SSH__airflow@myserver')
with DAG(
'ed_data_quality_20min-v1.6.6',
default_args=default_args,
schedule_interval="0,20,40 * * * *",
dagrun_timeout=timedelta(hours=24)) as dag:
(
dag
>> SSHExecuteOperator(
task_id='run_remote_ed_data_quality_20min',
bash_command='bash /opt/scripts/shell/EXEC_ED_DATA_QUALITY_20MIN.sh ',
ssh_hook=sshHookEtl,
retries=0,
concurrency=1,
dag=dag)
)
Here is the DAG details
我找到了解决方案。我没有在正确的位置添加并发参数。它应该作为 DAG
对象的属性直接添加,而不是在任务 SSHExecuteOperator
中添加。这是新代码:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.now(),
'email': ['exceptions@airflow.com'],
'email_on_failure': True,
'retries': 0
}
#server must be changed to point to the correct environment
sshHookEtl = SSHHook(conn_id='SSH__airflow@myserver')
with DAG(
'ed_data_quality_20min-v1.6.6',
default_args=default_args,
schedule_interval="0,20,40 * * * *",
dagrun_timeout=timedelta(hours=24),
concurrency=1) as dag:
(
dag
>> SSHExecuteOperator(
task_id='run_remote_ed_data_quality_20min',
bash_command='bash /opt/scripts/shell/EXEC_ED_DATA_QUALITY_20MIN.sh ',
ssh_hook=sshHookEtl,
retries=0,
dag=dag)
)
好的…
您可以在 DAG
对象上 just set concurrency
。
BaseOperator
对象上还有一个 task_concurrency
。 SSHExectorOperator
或 BaseOperator
任务中没有 concurrency
参数或字段。