运行 airflow 任务在循环中的任务之后,而不是在循环中的所有任务之后
Run an airflow task after a task in a loop, not after all tasks in a loop
假设我们有这些任务:
for endpoint in ENDPOINTS:
latest_only = LatestOnlyOperator(
task_id=f'{endpoint.name}_latest_only',
)
s3 = SnowflakeQOperator(
task_id=f'{endpoint.name}_to_S3',
boostr_conn_id='boostr_default',
s3_conn_id='aws_default',
partition=endpoint.partition,
endpoint=endpoint
)
short_circuit = ShortCircuitOperator(
task_id=f"short_circuit_missing_{endpoint.name}",
op_kwargs={'endpoint_to_check': endpoint, 'aws_conn_id': 'aws_default'},
python_callable=check_file_exists,
provide_context=True
)
s3 >> short_circuit
假设我想在 nbc_to_s3
之后向 运行 添加一项任务,这是 s3
任务中的“{endpoint.name}”任务之一。
我们正在使用 'name' 方法导入包含多个 class 的端点:
@property
def name(self) -> str:
return 'nbc'
我试过像这样将它添加到循环之外:
nbc_to_s3 >> new_task
但这不起作用,因为 'nbc_to_s3' 未定义
您可以在循环中应用一些逻辑来为 new_task
设置新的依赖关系,就像这样(为快速模型道歉):
from airflow.decorators import dag
from airflow.operators.dummy import DummyOperator
from datetime import datetime
ENDPOINTS = ["nbc", "cbs", "bravo", "espn"]
DEFAULT_ARGS = dict(owner="airflow", start_date=datetime(2021, 6, 9))
DAG_ARGS = dict(schedule_interval=None, default_args=DEFAULT_ARGS, catchup=False)
@dag(**DAG_ARGS)
def run_task_after_loop():
for endpoint in ENDPOINTS:
s3 = DummyOperator(
task_id=f"{endpoint}_to_S3",
)
short_circuit = DummyOperator(
task_id=f"short_circuit_missing_{endpoint}",
)
s3 >> short_circuit
if endpoint == "nbc":
new_task = DummyOperator(task_id=f"new_task_{endpoint}")
s3 >> new_task
dag = run_task_after_loop()
假设我们有这些任务:
for endpoint in ENDPOINTS:
latest_only = LatestOnlyOperator(
task_id=f'{endpoint.name}_latest_only',
)
s3 = SnowflakeQOperator(
task_id=f'{endpoint.name}_to_S3',
boostr_conn_id='boostr_default',
s3_conn_id='aws_default',
partition=endpoint.partition,
endpoint=endpoint
)
short_circuit = ShortCircuitOperator(
task_id=f"short_circuit_missing_{endpoint.name}",
op_kwargs={'endpoint_to_check': endpoint, 'aws_conn_id': 'aws_default'},
python_callable=check_file_exists,
provide_context=True
)
s3 >> short_circuit
假设我想在 nbc_to_s3
之后向 运行 添加一项任务,这是 s3
任务中的“{endpoint.name}”任务之一。
我们正在使用 'name' 方法导入包含多个 class 的端点:
@property
def name(self) -> str:
return 'nbc'
我试过像这样将它添加到循环之外:
nbc_to_s3 >> new_task
但这不起作用,因为 'nbc_to_s3' 未定义
您可以在循环中应用一些逻辑来为 new_task
设置新的依赖关系,就像这样(为快速模型道歉):
from airflow.decorators import dag
from airflow.operators.dummy import DummyOperator
from datetime import datetime
ENDPOINTS = ["nbc", "cbs", "bravo", "espn"]
DEFAULT_ARGS = dict(owner="airflow", start_date=datetime(2021, 6, 9))
DAG_ARGS = dict(schedule_interval=None, default_args=DEFAULT_ARGS, catchup=False)
@dag(**DAG_ARGS)
def run_task_after_loop():
for endpoint in ENDPOINTS:
s3 = DummyOperator(
task_id=f"{endpoint}_to_S3",
)
short_circuit = DummyOperator(
task_id=f"short_circuit_missing_{endpoint}",
)
s3 >> short_circuit
if endpoint == "nbc":
new_task = DummyOperator(task_id=f"new_task_{endpoint}")
s3 >> new_task
dag = run_task_after_loop()