在一个 DAG 中执行顺序和并发任务
Executing sequential and concurrent tasks within one DAG
我是 Airflow 的新手,有一些关于如何正确 运行 一些任务 并发 和其他 顺序 在一个 DAG 内。
在我的 DAG 中,基本步骤是:刷新数据,运行3 个单独的脚本,部署。这些应用程序中的每一个都 运行 在一个单独的 Docker 容器中。
在下面的例子中,一切都是按顺序进行的,然而,我的objective是刷新数据,然后做这个,做那个,the_other_thing 并行,然后部署。
refresh >> [this, that, the_other_thing] >> deploy
我只想在 [this, that, the_other_thing]
完成后部署,但不清楚这三个中哪一个会最后完成。在一个 DAG 中执行此序列的最佳做法是什么?设置 concurrency=3
并在 中执行 [this, that, the_other_thing]
就足够了吗?任何建议表示赞赏
from builtins import range
from datetime import timedelta, datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.hooks.base_hook import BaseHook
image = 'myserver.com:8080/my_project:latest'
args = {
'owner': 'Airflow',
'start_date': datetime(2020,01,01),
'depends_on_past': False,
"retries": 2,
'retry_delay': timedelta(minutes=5)
}
conn_foo_db = BaseHook.get_connection('foobar_db')
conn_docker = BaseHook.get_connection('my_registry')
dag = DAG(
dag_id='analysis',
default_args=args,
schedule_interval='0 3 * * *',
dagrun_timeout=timedelta(minutes=180),
max_active_runs=1,
concurrency=1,
tags=['daily']
)
refresh_data = BashOperator(
task_id='refresh_data',
bash_command='docker run '
'-i --rm '
f"-e DB_PASSWORD='{ conn_foo_db.password }' "
f' { image } '
'app=refresh',
dag=dag,
)
this = BashOperator(
task_id='run_app_this',
bash_command='docker run '
'-i --rm '
f"-e DB_PASSWORD='{ conn_foo_db.password }' "
f' { image } '
'app=do_this ',
dag=dag,
)
that = BashOperator(
task_id='run_app_that',
bash_command='docker run '
'-i --rm '
f"-e DB_PASSWORD='{ conn_foo_db.password }' "
f' { image } '
'app=do_that',
dag=dag,
)
the_other_thing = BashOperator(
task_id='run_app_the_other_thing',
bash_command='docker run '
'-i --rm '
f"-e DB_PASSWORD='{ conn_foo_db.password }' "
f' { image } '
'app=do_the_other_thing ',
dag=dag,
)
deploy = BashOperator(
task_id='deploy',
bash_command='docker run '
'-i --rm '
f"-e DB_PASSWORD='{ conn_foo_db.password }' "
f' { image } '
'app=deploy ',
dag=dag,
)
refresh_data >> run_app_this >> run_app_that >> run_app_the_other_thing >> deploy_to_dashboard
if __name__ == "__main__":
dag.cli()
是的,你的假设是正确的。
可能的代码可以是:
tasks_list = ["this", "that", "the_other_thing"]
refresh_data = BashOperator(
task_id='refresh_data_task',
bash_command='docker run '
'-i --rm '
f"-e DB_PASSWORD='{ conn_foo_db.password }' "
f' { image } '
'app=refresh',
dag=dag,
)
deploy = BashOperator(
task_id='deploy_task',
bash_command='docker run '
'-i --rm '
f"-e DB_PASSWORD='{ conn_foo_db.password }' "
f' { image } '
'app=deploy ',
dag=dag,
)
for task in tasks_list:
task_op = BashOperator(
task_id=f'run_{task}_task',
bash_command='docker run '
'-i --rm '
f"-e DB_PASSWORD='{conn_foo_db.password}' "
f' {image} '
f'app=do_{task}',
dag=dag,
)
refresh_data >> task_op >> deploy
由于默认的触发规则是ALL_SUCESS
,deploy
只有在tasks_list
中的所有任务都成功后才会开始运行。
一些注意事项:
- 您多次使用相同的代码,您可能需要考虑创建某种配置文件,其中包含依赖项和设置操作员所需的所有信息,然后使用从该文件构建操作员的工厂方法避免在 DAG 文件中重复代码。
- 避免访问存储在运营商范围之外的 Airflow Metastore 中的连接。这是一种不好的做法。 Airflow 会定期扫描您的 DAG 文件(根据
min_file_process_interval
),这将导致数据库中的大量数据。
我是 Airflow 的新手,有一些关于如何正确 运行 一些任务 并发 和其他 顺序 在一个 DAG 内。
在我的 DAG 中,基本步骤是:刷新数据,运行3 个单独的脚本,部署。这些应用程序中的每一个都 运行 在一个单独的 Docker 容器中。
在下面的例子中,一切都是按顺序进行的,然而,我的objective是刷新数据,然后做这个,做那个,the_other_thing 并行,然后部署。
refresh >> [this, that, the_other_thing] >> deploy
我只想在 [this, that, the_other_thing]
完成后部署,但不清楚这三个中哪一个会最后完成。在一个 DAG 中执行此序列的最佳做法是什么?设置 concurrency=3
并在 [this, that, the_other_thing]
就足够了吗?任何建议表示赞赏
from builtins import range
from datetime import timedelta, datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.hooks.base_hook import BaseHook
image = 'myserver.com:8080/my_project:latest'
args = {
'owner': 'Airflow',
'start_date': datetime(2020,01,01),
'depends_on_past': False,
"retries": 2,
'retry_delay': timedelta(minutes=5)
}
conn_foo_db = BaseHook.get_connection('foobar_db')
conn_docker = BaseHook.get_connection('my_registry')
dag = DAG(
dag_id='analysis',
default_args=args,
schedule_interval='0 3 * * *',
dagrun_timeout=timedelta(minutes=180),
max_active_runs=1,
concurrency=1,
tags=['daily']
)
refresh_data = BashOperator(
task_id='refresh_data',
bash_command='docker run '
'-i --rm '
f"-e DB_PASSWORD='{ conn_foo_db.password }' "
f' { image } '
'app=refresh',
dag=dag,
)
this = BashOperator(
task_id='run_app_this',
bash_command='docker run '
'-i --rm '
f"-e DB_PASSWORD='{ conn_foo_db.password }' "
f' { image } '
'app=do_this ',
dag=dag,
)
that = BashOperator(
task_id='run_app_that',
bash_command='docker run '
'-i --rm '
f"-e DB_PASSWORD='{ conn_foo_db.password }' "
f' { image } '
'app=do_that',
dag=dag,
)
the_other_thing = BashOperator(
task_id='run_app_the_other_thing',
bash_command='docker run '
'-i --rm '
f"-e DB_PASSWORD='{ conn_foo_db.password }' "
f' { image } '
'app=do_the_other_thing ',
dag=dag,
)
deploy = BashOperator(
task_id='deploy',
bash_command='docker run '
'-i --rm '
f"-e DB_PASSWORD='{ conn_foo_db.password }' "
f' { image } '
'app=deploy ',
dag=dag,
)
refresh_data >> run_app_this >> run_app_that >> run_app_the_other_thing >> deploy_to_dashboard
if __name__ == "__main__":
dag.cli()
是的,你的假设是正确的。 可能的代码可以是:
tasks_list = ["this", "that", "the_other_thing"]
refresh_data = BashOperator(
task_id='refresh_data_task',
bash_command='docker run '
'-i --rm '
f"-e DB_PASSWORD='{ conn_foo_db.password }' "
f' { image } '
'app=refresh',
dag=dag,
)
deploy = BashOperator(
task_id='deploy_task',
bash_command='docker run '
'-i --rm '
f"-e DB_PASSWORD='{ conn_foo_db.password }' "
f' { image } '
'app=deploy ',
dag=dag,
)
for task in tasks_list:
task_op = BashOperator(
task_id=f'run_{task}_task',
bash_command='docker run '
'-i --rm '
f"-e DB_PASSWORD='{conn_foo_db.password}' "
f' {image} '
f'app=do_{task}',
dag=dag,
)
refresh_data >> task_op >> deploy
由于默认的触发规则是ALL_SUCESS
,deploy
只有在tasks_list
中的所有任务都成功后才会开始运行。
一些注意事项:
- 您多次使用相同的代码,您可能需要考虑创建某种配置文件,其中包含依赖项和设置操作员所需的所有信息,然后使用从该文件构建操作员的工厂方法避免在 DAG 文件中重复代码。
- 避免访问存储在运营商范围之外的 Airflow Metastore 中的连接。这是一种不好的做法。 Airflow 会定期扫描您的 DAG 文件(根据
min_file_process_interval
),这将导致数据库中的大量数据。