为什么此获取 Airflow 上下文的代码在 DAG 导入时获取 运行?

Why does this code to get Airflow context get run on DAG import?

我有一个 Airflow DAG,我需要从 Airflow 上下文中获取触发 DAG 的参数。

以前,我有在 DAG 步骤中获取这些参数的代码(我使用的是来自 Airflow 2 的任务流 API)——类似于此:

from typing import Dict, Any, List
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.utils.dates import days_ago

default_args = {"owner": "airflow"}

@dag(
    default_args=default_args,
    start_date=days_ago(1),
    schedule_interval=None,
    tags=["my_pipeline"],
)
def my_pipeline():
    @task(multiple_outputs=True)
    def get_params() -> Dict[str, Any]:
        context = get_current_context()
        params = context["params"]
        assert isinstance(params, dict)
        return params

    params = get_params()

pipeline = my_pipeline()

这按预期工作。

但是,我需要分几步获取这些参数,所以我认为移动代码以将它们放入全局范围内的单独函数中是个好主意,如下所示:

# ...
from airflow.operators.python import get_current_context

# other top-level code here

def get_params() -> Dict[str, Any]:
    context = get_current_context()
    params = context["params"]
    return params

@dag(...)
def my_pipeline():
    @task()
    def get_data():
        params = get_params()

    # other DAG tasks here
    get_data()

pipeline = my_pipeline()

现在,这会在 DAG 导入时中断,出现以下错误(名称更改为与上面的示例匹配):

Broken DAG: [/home/airflow/gcs/dags/my_pipeline.py] Traceback (most recent call last):
  File "/home/airflow/gcs/dags/my_pipeline.py", line 26, in get_params
    context = get_context()
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 467, in get_context
    raise AirflowException(
airflow.exceptions.AirflowException: Current context was requested but no context was found! Are you running within an airflow task?

我明白了错误的意思以及如何修复它(移动代码以将上下文返回到 @task 中)。但我的问题是 - 为什么错误会在 DAG 导入时出现?

get_params 不会在其他任务之外的任何地方被调用,并且这些任务显然不是 运行 直到 DAG 运行s。那么,为什么在导入 DAG 时 get_params 运行 中的代码完全正确?

在这一点上,我想了解这一点只是因为这个错误出现时出现的事实打破了我对 Python 模块如何在导入时进行评估的理解。在函数为 运行 之前,函数内的代码不应 运行,并且在 运行 之前可能出现的唯一错误是 SyntaxError(可能还有其他一些核心错误我现在不记得了)。

Airflow 是在施展某种特殊的魔法,还是我遗漏了一些更简单的事情?

我正在 运行ning Airflow 2.1.2 由 Google Cloud Composer 1.17.2 管理。

很遗憾,我无法重现您的问题。下面的类似代码解析、渲染 DAG,并在 Airflow 2.0、2,1 和 2.2 上成功完成:

from datetime import datetime
from typing import Any, Dict

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context


def get_params() -> Dict[str, Any]:
    context = get_current_context()
    params = context["params"]
    return params


@dag(
    dag_id="get_current_context_test",
    start_date=datetime(2021, 1, 1),
    schedule_interval=None,
    params={"my_param": "param_value"},
)
def my_pipeline():
    @task()
    def get_data():
        params = get_params()
        print(params)

    get_data()


pipeline = my_pipeline()

任务日志片段:

但是context对象可以直接在任务装饰函数中访问。您可以更新任务签名以包含 params=None 的 arg(使用默认值以便文件在没有 TypeError 异常的情况下进行解析),然后对该 arg 应用您需要的任何逻辑。这也可以用 tidag_run 等来完成。也许这有帮助?

@dag(
    dag_id="get_current_context_test",
    start_date=datetime(2021, 1, 1),
    schedule_interval=None,
    params={"my_param": "param_value"},
)
def my_pipeline():
    @task()
    def get_data(params=None):
        print(params)

    get_data()


pipeline = my_pipeline()