在 Airflow 2.0 中访问 operator/sensor 之外的宏
Accessing macros outside of an operator/sensor in Airflow 2.0
希望这是一个简单的问题。
我是 Airflow 的菜鸟,我正在尝试用 Airflow DAG 替换我的一个 ETL 作为 POC,但我正在努力做一件基本的事情。
我希望将 DAG 运行 的 execution_date 或 ds 宏注入到外部函数的 SQL 字符串中,这样我就可以动态地 move/aggregate 基于 execution_date 的数据,这对作业重新 运行 和回填很有用。
到目前为止,DAG 的基本结构如下:
def create_db_engine():
[redacted]
return engine
def run_query(sql):
engine = create_db_engine()
connection = engine.connect()
data = connection.execute(sql)
return data
def wait_for_data():
sensor_query = f'''
select blah
from table
limit 1
'''
if run_query(sensor_query).rowcount >= 1:
return True
else:
return False
def run_aggregation():
agg_query = f'''
delete from table
where datefield = '{{ prev_ds }}'::DATE;
insert into table(datefield, metric)
select date_trunc('day', timefield), sum(metric)
from sourcetable
where timefield >= '{{ prev_ds }}'::DATE
and timefield < '{{ ds }}'::DATE
group by 1;
'''
run_query(agg_query)
@task
def data_sensor():
PythonSensor(
task_id='check_data',
python_callable=wait_for_data(),
)
@task
def agg_operator(args):
PythonOperator(
task_id='agg_data',
python_callable=run_aggregation()
)
Summary/notes:
- 基本前提是等待某事发生,然后运行一个能够利用执行日期的查询。
- 我正在尝试使用 {{}} 宏语法,但它似乎无法在 operator/sensor 调用之外使用。
- 我正在使用 SQL 炼金术,因为我使用 IAM 角色链通过 AWS Redshift 进行身份验证,但我找不到使其与 SQLoperators/sensors 一起工作的方法。尽管如果有人对此有解决方案,那将是一个不错的额外答案。
- Python 3.9,气流 2.1.2。数据库是 Amazon Redshift。
我尝试了几种不同的方法来使事情正常进行:
kwargs #1 -
根据这里的答案 [ provide_context=True 应该可以通过 kwargs 将变量作为 **kwargs 传递给函数。但这对我不起作用。
def data_sensor():
PythonSensor(
task_id='check_data',
python_callable=wait_for_data(),
poke_interval=30,
timeout=3600,
provide_context=True
)
...
ds = kwargs['ds']
prev_ds = kwargs['prev_ds']
...
Error
ds = kwargs['ds']
KeyError: 'ds'
kwargs #2 -
这里的答案 [ 建议通过 templates_dict 变量将您想要的字段添加到模板中。但这也不管用
def data_sensor():
PythonSensor(
task_id='check_data',
python_callable=wait_for_data(),
poke_interval=30,
timeout=3600,
provide_context=True,
templates_dict={'start_date': '{{ prev_ds }}',
'end_date': '{{ ds }}',
'next_date': '{{ next_ds }}'},
)
...
Error
end_date = kwargs.get(['templates_dict']).get(['ds'])
TypeError: unhashable type: 'list'
所以我的问题是:
- 这是怎么回事?这可能吗?
- 这是实现我需要的正确范例吗?或者有没有更简单的方法?
这实际上很有可能,而且不是一个坏主意,但您必须通过 JINJA 模板引擎手动 运行 您的字符串(这是 Airflow 在处理传递给运算符的模板化参数时所做的)。
Airflow 会自动为添加到 templated_fields
列表的所有字段执行此操作 - 但由于您的操作员是 Python 代码,因此没有什么可以阻止您手动进行类似处理。
您不应像您尝试的那样使用 PythonOperator。使用 TaskFlow API,@task 会自动用 PythonOperator 包装您的可调用 Python 方法,因此您的任务只是编写正确的 Python 代码 - 甚至无需考虑 Python运算符.
唯一的困难是您需要获取上下文,但这可以通过 get_current_context() 方法轻松实现:Passing arguements using Taskflow API in Airflow 2.0
一旦你有了上下文(这正是包含所有 {{ next_ds }} 和其他上下文变量的内容)你可以简单地获取你的字符串并使用 Jinja 模板处理它,将上下文传递给 JINJA。您可以看到 Airflow 在内部是如何做到的:https://github.com/apache/airflow/blob/932c3b5df5444970484f0ec23589be1820a3270d/airflow/models/baseoperator.py#L1070 - 它有点复杂,因为它处理了 xcom 等几种不同的情况,但您可以将其作为灵感。
除了 Jarek 的回答(确实有效)之外,我发现了一种更直接的方法来完成我需要的事情,不需要 Jinja 模板。事实证明,您可以导入 get_current_context 函数并使用它在函数之间传递作业上下文。
from airflow.operators.python import PythonOperator, get_current_context
...
@task
def data_sensor():
context = get_current_context()
PythonSensor(
task_id='check_data',
python_callable=wait_for_data(context),
poke_interval=30,
timeout=3600
)
....
def wait_for_data(context):
end_date = context['ds']
next_date = context['next_ds']
希望这是一个简单的问题。
我是 Airflow 的菜鸟,我正在尝试用 Airflow DAG 替换我的一个 ETL 作为 POC,但我正在努力做一件基本的事情。
我希望将 DAG 运行 的 execution_date 或 ds 宏注入到外部函数的 SQL 字符串中,这样我就可以动态地 move/aggregate 基于 execution_date 的数据,这对作业重新 运行 和回填很有用。
到目前为止,DAG 的基本结构如下:
def create_db_engine():
[redacted]
return engine
def run_query(sql):
engine = create_db_engine()
connection = engine.connect()
data = connection.execute(sql)
return data
def wait_for_data():
sensor_query = f'''
select blah
from table
limit 1
'''
if run_query(sensor_query).rowcount >= 1:
return True
else:
return False
def run_aggregation():
agg_query = f'''
delete from table
where datefield = '{{ prev_ds }}'::DATE;
insert into table(datefield, metric)
select date_trunc('day', timefield), sum(metric)
from sourcetable
where timefield >= '{{ prev_ds }}'::DATE
and timefield < '{{ ds }}'::DATE
group by 1;
'''
run_query(agg_query)
@task
def data_sensor():
PythonSensor(
task_id='check_data',
python_callable=wait_for_data(),
)
@task
def agg_operator(args):
PythonOperator(
task_id='agg_data',
python_callable=run_aggregation()
)
Summary/notes:
- 基本前提是等待某事发生,然后运行一个能够利用执行日期的查询。
- 我正在尝试使用 {{}} 宏语法,但它似乎无法在 operator/sensor 调用之外使用。
- 我正在使用 SQL 炼金术,因为我使用 IAM 角色链通过 AWS Redshift 进行身份验证,但我找不到使其与 SQLoperators/sensors 一起工作的方法。尽管如果有人对此有解决方案,那将是一个不错的额外答案。
- Python 3.9,气流 2.1.2。数据库是 Amazon Redshift。
我尝试了几种不同的方法来使事情正常进行:
kwargs #1 -
根据这里的答案 [
def data_sensor():
PythonSensor(
task_id='check_data',
python_callable=wait_for_data(),
poke_interval=30,
timeout=3600,
provide_context=True
)
...
ds = kwargs['ds']
prev_ds = kwargs['prev_ds']
...
Error
ds = kwargs['ds']
KeyError: 'ds'
kwargs #2 - 这里的答案 [ 建议通过 templates_dict 变量将您想要的字段添加到模板中。但这也不管用
def data_sensor():
PythonSensor(
task_id='check_data',
python_callable=wait_for_data(),
poke_interval=30,
timeout=3600,
provide_context=True,
templates_dict={'start_date': '{{ prev_ds }}',
'end_date': '{{ ds }}',
'next_date': '{{ next_ds }}'},
)
...
Error
end_date = kwargs.get(['templates_dict']).get(['ds'])
TypeError: unhashable type: 'list'
所以我的问题是:
- 这是怎么回事?这可能吗?
- 这是实现我需要的正确范例吗?或者有没有更简单的方法?
这实际上很有可能,而且不是一个坏主意,但您必须通过 JINJA 模板引擎手动 运行 您的字符串(这是 Airflow 在处理传递给运算符的模板化参数时所做的)。
Airflow 会自动为添加到 templated_fields
列表的所有字段执行此操作 - 但由于您的操作员是 Python 代码,因此没有什么可以阻止您手动进行类似处理。
您不应像您尝试的那样使用 PythonOperator。使用 TaskFlow API,@task 会自动用 PythonOperator 包装您的可调用 Python 方法,因此您的任务只是编写正确的 Python 代码 - 甚至无需考虑 Python运算符.
唯一的困难是您需要获取上下文,但这可以通过 get_current_context() 方法轻松实现:Passing arguements using Taskflow API in Airflow 2.0
一旦你有了上下文(这正是包含所有 {{ next_ds }} 和其他上下文变量的内容)你可以简单地获取你的字符串并使用 Jinja 模板处理它,将上下文传递给 JINJA。您可以看到 Airflow 在内部是如何做到的:https://github.com/apache/airflow/blob/932c3b5df5444970484f0ec23589be1820a3270d/airflow/models/baseoperator.py#L1070 - 它有点复杂,因为它处理了 xcom 等几种不同的情况,但您可以将其作为灵感。
除了 Jarek 的回答(确实有效)之外,我发现了一种更直接的方法来完成我需要的事情,不需要 Jinja 模板。事实证明,您可以导入 get_current_context 函数并使用它在函数之间传递作业上下文。
from airflow.operators.python import PythonOperator, get_current_context
...
@task
def data_sensor():
context = get_current_context()
PythonSensor(
task_id='check_data',
python_callable=wait_for_data(context),
poke_interval=30,
timeout=3600
)
....
def wait_for_data(context):
end_date = context['ds']
next_date = context['next_ds']