如何传递动态参数 Airflow 运算符?

How to pass dynamic arguments Airflow operator?

我正在使用 Airflow 在 Google Cloud Composer 上 运行 Spark 作业。我需要

使用 Airflow API - 我可以读取 YAML 文件,并使用 xcom 跨任务推送变量。

但是,考虑 DataprocClusterCreateOperator()

和其他一些参数被标记为模板化。

如果我想传入其他参数作为模板(目前不是这样)怎么办? - 喜欢 image_versionnum_workersworker_machine_type 等等?

有什么解决方法吗?

不确定'dynamic'是什么意思,但是当yaml文件更新时,如果读取文件进程在dag文件体中,dag将被刷新以从yaml文件申请新的args。所以实际上,您不需要 XCOM 来获取参数。 只需简单地创建一个参数字典然后传递给 default_args:

CONFIGFILE = os.path.join(
    os.path.dirname(os.path.realpath(\__file__)), 'your_yaml_file')

with open(CONFIGFILE, 'r') as ymlfile:
    CFG = yaml.load(ymlfile)

default_args = {
    'cluster_name': CFG['section_A']['cluster_name'], # edit here according to the structure of your yaml file.
    'project_id': CFG['section_A']['project_id'],
    'zone': CFG['section_A']['zone'],
    'mage_version': CFG['section_A']['image_version'],
    'num_workers': CFG['section_A']['num_workers'],
    'worker_machine_type': CFG['section_A']['worker_machine_type'],
    # you can add all needs params here.
}

DAG = DAG(
    dag_id=DAG_NAME,
    schedule_interval=SCHEDULE_INTEVAL,
    default_args=default_args, # pass the params to DAG environment
)

Task1 = DataprocClusterCreateOperator(
    task_id='your_task_id',
    dag=DAG
)

但是如果您想要动态 dag 而不是参数,您可能需要其他策略,例如 this

所以你可能需要弄清楚基本思路: 动态在哪个层次?任务等级? DAG 级别?

或者您可以创建自己的 Operator 来完成这项工作并获取参数。