在 Airflow 2.0 中访问 operator/sensor 之外的宏

Accessing macros outside of an operator/sensor in Airflow 2.0

希望这是一个简单的问题。

我是 Airflow 的菜鸟,我正在尝试用 Airflow DAG 替换我的一个 ETL 作为 POC,但我正在努力做一件基本的事情。

我希望将 DAG 运行 的 execution_date 或 ds 宏注入到外部函数的 SQL 字符串中,这样我就可以动态地 move/aggregate 基于 execution_date 的数据,这对作业重新 运行 和回填很有用。

到目前为止,DAG 的基本结构如下:

def create_db_engine():
    [redacted]
    return engine

def run_query(sql):
    engine = create_db_engine()
    connection = engine.connect()
    data = connection.execute(sql)
    return data

def wait_for_data():
    sensor_query = f'''
    select blah
    from table
    limit 1
    '''
    if run_query(sensor_query).rowcount >= 1:
        return True
    else:
        return False

def run_aggregation():
    agg_query = f'''
    delete from table
    where datefield = '{{ prev_ds }}'::DATE;
    
    insert into table(datefield, metric)
    select date_trunc('day', timefield), sum(metric)
    from sourcetable
    where timefield >= '{{ prev_ds }}'::DATE
    and timefield < '{{ ds }}'::DATE
    group by 1;
    '''
    run_query(agg_query)

@task
def data_sensor():
    PythonSensor(
        task_id='check_data',
        python_callable=wait_for_data(),
    )

@task
def agg_operator(args):
    PythonOperator(
        task_id='agg_data',
        python_callable=run_aggregation()
    )

Summary/notes:

  1. 基本前提是等待某事发生,然后运行一个能够利用执行日期的查询。
  2. 我正在尝试使用 {{}} 宏语法,但它似乎无法在 operator/sensor 调用之外使用。
  3. 我正在使用 SQL 炼金术,因为我使用 IAM 角色链通过 AWS Redshift 进行身份验证,但我找不到使其与 SQLoperators/sensors 一起工作的方法。尽管如果有人对此有解决方案,那将是一个不错的额外答案。
  4. Python 3.9,气流 2.1.2。数据库是 Amazon Redshift。

我尝试了几种不同的方法来使事情正常进行:

kwargs #1 - 根据这里的答案 [ provide_context=True 应该可以通过 kwargs 将变量作为 **kwargs 传递给函数。但这对我不起作用。

def data_sensor():
    PythonSensor(
        task_id='check_data',
        python_callable=wait_for_data(),
        poke_interval=30,
        timeout=3600,
        provide_context=True
    )
...
ds = kwargs['ds']
prev_ds = kwargs['prev_ds']
...
Error
    ds = kwargs['ds']
KeyError: 'ds'

kwargs #2 - 这里的答案 [ 建议通过 templates_dict 变量将您想要的字段添加到模板中。但这也不管用

def data_sensor():
    PythonSensor(
        task_id='check_data',
        python_callable=wait_for_data(),
        poke_interval=30,
        timeout=3600,
        provide_context=True,
        templates_dict={'start_date': '{{ prev_ds }}',
                        'end_date': '{{ ds }}',
                        'next_date': '{{ next_ds }}'},
    )
...
Error
    end_date = kwargs.get(['templates_dict']).get(['ds'])
TypeError: unhashable type: 'list'

所以我的问题是:

  1. 这是怎么回事?这可能吗?
  2. 这是实现我需要的正确范例吗?或者有没有更简单的方法?

这实际上很有可能,而且不是一个坏主意,但您必须通过 JINJA 模板引擎手动 运行 您的字符串(这是 Airflow 在处理传递给运算符的模板化参数时所做的)。

Airflow 会自动为添加到 templated_fields 列表的所有字段执行此操作 - 但由于您的操作员是 Python 代码,因此没有什么可以阻止您手动进行类似处理。

您不应像您尝试的那样使用 PythonOperator。使用 TaskFlow API,@task 会自动用 PythonOperator 包装您的可调用 Python 方法,因此您的任务只是编写正确的 Python 代码 - 甚至无需考虑 Python运算符.

唯一的困难是您需要获取上下文,但这可以通过 get_current_context() 方法轻松实现:Passing arguements using Taskflow API in Airflow 2.0

一旦你有了上下文(这正是包含所有 {{ next_ds }} 和其他上下文变量的内容)你可以简单地获取你的字符串并使用 Jinja 模板处理它,将上下文传递给 JINJA。您可以看到 Airflow 在内部是如何做到的:https://github.com/apache/airflow/blob/932c3b5df5444970484f0ec23589be1820a3270d/airflow/models/baseoperator.py#L1070 - 它有点复杂,因为它处理了 xcom 等几种不同的情况,但您可以将其作为灵感。

除了 Jarek 的回答(确实有效)之外,我发现了一种更直接的方法来完成我需要的事情,不需要 Jinja 模板。事实证明,您可以导入 get_current_context 函数并使用它在函数之间传递作业上下文。

from airflow.operators.python import PythonOperator, get_current_context
...
@task
def data_sensor():
    context = get_current_context()
    PythonSensor(
        task_id='check_data',
        python_callable=wait_for_data(context),
        poke_interval=30,
        timeout=3600
    )
....
def wait_for_data(context):
    end_date = context['ds']
    next_date = context['next_ds']