气流,避免来自 SSM 的顶级拉动
Airflow, avoid top level pull from SSM
我有以下 DAG,效果很好:
from airflow import DAG
from airflow.models import Variable
from airflow.operators.subdag import SubDagOperator
from subdags import my_subdag
data_sets = Variable.get("data_sets", deserialize_json=True).get("data")
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 1, 1),
}
with DAG(
'myDAG',
default_args=default_args,
schedule_interval='00 12 * * *'
) as dag:
...
for data_set in data_sets:
subdag = SubDagOperator(
task_id=f'{data_set}_subdag',
subdag=my_subdag(
parent_dag_name='myDAG',
child_dag_name=f'{data_set}_subdag',
),
...
default_args=default_args,
)
start >> subdag >> end
但如您所见,我在顶层调用 Variable,这不是最佳实践(调度程序每分钟左右查询一次秘密后端)。
我该怎么做才能使气流仅在执行期间调用 Variable.get?我正在查看 best practices,我不能使用另一个文件 ('Generating Python code with embedded meta-data') 所以我虽然也许 jinja 模板可以提供帮助,但我不确定如何继续。
没有。你目前不能做不同的事情。如果您想拥有一个基于某些外部资源的动态 DAG 结构,那么您只能使用 Top-Level 代码(并遵循最佳实践)。
但是如果你想有N个相同的任务(有一些可变索引),你想在任务执行时根据一些动态数据(这似乎是你想做的)动态启动,有一个变化即将推出的 2.3.0 将使您能够做到这一点。
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-42+Dynamic+Task+Mapping
我有以下 DAG,效果很好:
from airflow import DAG
from airflow.models import Variable
from airflow.operators.subdag import SubDagOperator
from subdags import my_subdag
data_sets = Variable.get("data_sets", deserialize_json=True).get("data")
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 1, 1),
}
with DAG(
'myDAG',
default_args=default_args,
schedule_interval='00 12 * * *'
) as dag:
...
for data_set in data_sets:
subdag = SubDagOperator(
task_id=f'{data_set}_subdag',
subdag=my_subdag(
parent_dag_name='myDAG',
child_dag_name=f'{data_set}_subdag',
),
...
default_args=default_args,
)
start >> subdag >> end
但如您所见,我在顶层调用 Variable,这不是最佳实践(调度程序每分钟左右查询一次秘密后端)。
我该怎么做才能使气流仅在执行期间调用 Variable.get?我正在查看 best practices,我不能使用另一个文件 ('Generating Python code with embedded meta-data') 所以我虽然也许 jinja 模板可以提供帮助,但我不确定如何继续。
没有。你目前不能做不同的事情。如果您想拥有一个基于某些外部资源的动态 DAG 结构,那么您只能使用 Top-Level 代码(并遵循最佳实践)。
但是如果你想有N个相同的任务(有一些可变索引),你想在任务执行时根据一些动态数据(这似乎是你想做的)动态启动,有一个变化即将推出的 2.3.0 将使您能够做到这一点。
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-42+Dynamic+Task+Mapping