如何从气流中的任务动态生成下游列表

How to dynamically generate downstream list from task in airflow

我有一个主要任务在这个函数中保持它的逻辑

我不太确定该怎么做。也许我需要在两者之间完成另一项任务?任何帮助表示赞赏。谢谢!

如果我没理解错的话,您已经创建了多个任务,但您需要动态定义哪些任务将跟随下游执行。如果是这种情况,您可以安全地使用 BranchPythonOperator:

It derives the PythonOperator and expects a Python function that returns a single task_id or list of task_ids to follow. The task_id(s) returned should point to a task directly downstream from {self}. All other "branches" or directly downstream tasks are marked with a state of skipped so that these paths can't move forward. The skipped states are propagated downstream to allow for the DAG state to fill up and the DAG run's state to be inferred.

根据 example_dag 随 Airflow 分发的内容考虑以下内容:

with DAG(
    dag_id="branch_multiple_tasks",
    default_args=args,
    start_date=days_ago(1),
    schedule_interval="@daily",
    tags=["example"],
) as dag:

    run_this_first = DummyOperator(
        task_id="run_this_first",
    )

    options = ["branch_a", "branch_b", "branch_c", "branch_d"]

    branching = BranchPythonOperator(
        task_id="branching",
        python_callable=lambda: options[1:3],
    )
    run_this_first >> branching

    join = DummyOperator(
        task_id="join",
        trigger_rule="none_failed_or_skipped",
    )

    for option in options:
        t = DummyOperator(
            task_id=option,
        )

        dummy_follow = DummyOperator(
            task_id="follow_" + option,
        )

        # Label is optional here, but it can help identify more complex branches
        branching >> Label(option) >> t >> dummy_follow >> join

在此示例中,传递给 branching 任务的 python_callable 被硬编码为 return ['branch_b', 'branch_c']。 您可以根据任何条件提供自己的可调用对象和 return 一个 tasks_id 列表作为字符串。只要您 return 符合预期的格式,您甚至可以使用 get_campaign_active 函数。如果您创建一个新函数并从上一个函数执行 xcom_pull 可能会更干净。我想这取决于你的需要。

图表视图:

让我知道这是否对您有用!