运行 airflow 任务在循环中的任务之后,而不是在循环中的所有任务之后

Run an airflow task after a task in a loop, not after all tasks in a loop

假设我们有这些任务:

for endpoint in ENDPOINTS:
    latest_only = LatestOnlyOperator(
        task_id=f'{endpoint.name}_latest_only',
    )

    s3 = SnowflakeQOperator(
        task_id=f'{endpoint.name}_to_S3',
        boostr_conn_id='boostr_default',
        s3_conn_id='aws_default',
        partition=endpoint.partition,
        endpoint=endpoint
    )

    short_circuit = ShortCircuitOperator(
        task_id=f"short_circuit_missing_{endpoint.name}",
        op_kwargs={'endpoint_to_check': endpoint, 'aws_conn_id': 'aws_default'},
        python_callable=check_file_exists,
        provide_context=True
    )

     s3 >> short_circuit

假设我想在 nbc_to_s3 之后向 运行 添加一项任务,这是 s3 任务中的“{endpoint.name}”任务之一。

我们正在使用 'name' 方法导入包含多个 class 的端点:

@property
def name(self) -> str:
    return 'nbc'

我试过像这样将它添加到循环之外: nbc_to_s3 >> new_task 但这不起作用,因为 'nbc_to_s3' 未定义

您可以在循环中应用一些逻辑来为 new_task 设置新的依赖关系,就像这样(为快速模型道歉):

from airflow.decorators import dag
from airflow.operators.dummy import DummyOperator

from datetime import datetime

ENDPOINTS = ["nbc", "cbs", "bravo", "espn"]
DEFAULT_ARGS = dict(owner="airflow", start_date=datetime(2021, 6, 9))
DAG_ARGS = dict(schedule_interval=None, default_args=DEFAULT_ARGS, catchup=False)


@dag(**DAG_ARGS)
def run_task_after_loop():
    for endpoint in ENDPOINTS:
        s3 = DummyOperator(
            task_id=f"{endpoint}_to_S3",
        )

        short_circuit = DummyOperator(
            task_id=f"short_circuit_missing_{endpoint}",
        )

        s3 >> short_circuit

        if endpoint == "nbc":
            new_task = DummyOperator(task_id=f"new_task_{endpoint}")

            s3 >> new_task


dag = run_task_after_loop()