气流,避免来自 SSM 的顶级拉动

Airflow, avoid top level pull from SSM

我有以下 DAG,效果很好:

from airflow import DAG
from airflow.models import Variable
from airflow.operators.subdag import SubDagOperator
from subdags import my_subdag

data_sets = Variable.get("data_sets", deserialize_json=True).get("data")

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 1, 1),
}

with DAG(
        'myDAG',
        default_args=default_args,
        schedule_interval='00 12 * * *'
) as dag:

    ...

    for data_set in data_sets:
        subdag = SubDagOperator(
            task_id=f'{data_set}_subdag',
            subdag=my_subdag(
                parent_dag_name='myDAG',
                child_dag_name=f'{data_set}_subdag',
            ),
            ...
            default_args=default_args,
        )
        start >> subdag >> end

但如您所见,我在顶层调用 Variable,这不是最佳实践(调度程序每分钟左右查询一次秘密后端)。

我该怎么做才能使气流仅在执行期间调用 Variable.get?我正在查看 best practices,我不能使用另一个文件 ('Generating Python code with embedded meta-data') 所以我虽然也许 jinja 模板可以提供帮助,但我不确定如何继续。

没有。你目前不能做不同的事情。如果您想拥有一个基于某些外部资源的动态 DAG 结构,那么您只能使用 Top-Level 代码(并遵循最佳实践)。

但是如果你想有N个相同的任务(有一些可变索引),你想在任务执行时根据一些动态数据(这似乎是你想做的)动态启动,有一个变化即将推出的 2.3.0 将使您能够做到这一点。

https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-42+Dynamic+Task+Mapping