for循环AirFlow生成的任务之间的依赖关系
Dependencies between tasks generated by for loop AirFlow
当我通过for循环生成所有操作符时,如何创建任务依赖关系。
示例:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
tables = {
'test_1': {
'from_conn': 'c.from_conn',
'to_conn': 'c.to_conn'
},
'test_2': {
'from_conn': 'c.from_conn',
'to_conn': 'c.to_conn'
},
'test_3': {
'from_conn': 'c.from_conn',
'to_conn': 'c.to_conn'
},
}
default_args = {
"depends_on_past": False,
"start_date": datetime(2021, 4, 13),
"max_active_runs": 1,
}
dag = DAG(
'etl',
default_args=default_args,
schedule_interval='30 3 * * 0',
catchup=False,
concurrency = 1,
)
def run():
print('Running ETL')
for table, args in tables.items():
task_name = table
task_name = PythonOperator(
task_id=table,
dag=dag,
python_callable=run,
provide_context=False,
)
如何声明任务 运行 序列如 test_1 >> test_2 >> test_3 而不会出错?
这是一个简单的示例,但您可以应用相同的想法(尽管它使用 TaskFlow API 而不是 PythonOperator
):
from datetime import datetime
from airflow.decorators import dag, task
from airflow.models.baseoperator import chain
from airflow.operators.dummy import DummyOperator
@dag(start_date=datetime(2022, 1, 1), schedule_interval=None)
def tasks_in_loop():
tasks = []
for i in range(3):
@task(task_id=f"task_{i}")
def run():
...
t = run()
tasks.append(t)
chain(*tasks)
dag = tasks_in_loop()
作为参考,请查看 documentation on the chain()
method。
当我通过for循环生成所有操作符时,如何创建任务依赖关系。
示例:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
tables = {
'test_1': {
'from_conn': 'c.from_conn',
'to_conn': 'c.to_conn'
},
'test_2': {
'from_conn': 'c.from_conn',
'to_conn': 'c.to_conn'
},
'test_3': {
'from_conn': 'c.from_conn',
'to_conn': 'c.to_conn'
},
}
default_args = {
"depends_on_past": False,
"start_date": datetime(2021, 4, 13),
"max_active_runs": 1,
}
dag = DAG(
'etl',
default_args=default_args,
schedule_interval='30 3 * * 0',
catchup=False,
concurrency = 1,
)
def run():
print('Running ETL')
for table, args in tables.items():
task_name = table
task_name = PythonOperator(
task_id=table,
dag=dag,
python_callable=run,
provide_context=False,
)
如何声明任务 运行 序列如 test_1 >> test_2 >> test_3 而不会出错?
这是一个简单的示例,但您可以应用相同的想法(尽管它使用 TaskFlow API 而不是 PythonOperator
):
from datetime import datetime
from airflow.decorators import dag, task
from airflow.models.baseoperator import chain
from airflow.operators.dummy import DummyOperator
@dag(start_date=datetime(2022, 1, 1), schedule_interval=None)
def tasks_in_loop():
tasks = []
for i in range(3):
@task(task_id=f"task_{i}")
def run():
...
t = run()
tasks.append(t)
chain(*tasks)
dag = tasks_in_loop()
作为参考,请查看 documentation on the chain()
method。