如何在触发 Airflow dag 之前检索通过 JSON 解析的值?

How can I retrieve a value that is being parsed via JSON before triggering Airflow dag?

我有一个气流 dag,我在触发它之前将 JSON 解析为输入。我能够通过 **kwargs 检索值并将它们推送到 xcom 以及 PythonOperator 方法中。

def push_to_xcom(ds, **kwargs):
    shape_change_tables = []
    ss_cd = {}
    env = ''

    if 'env' in kwargs['dag_run'].conf:
        env = kwargs['dag_run'].conf['env']
    else:
        env = 'dev'
        print("by default environment take as 'dev'")

    if isinstance(kwargs['dag_run'].conf['ss_cd'], dict):
        ss_cd = dict(kwargs['dag_run'].conf['ss_cd'])
    else:
        print('<<<<<<<<<<Pass sscd as an argument>>>>>>>>>>>')
        sys.exit(-1)

但我无法做的是检索 PythonOperator 之外的值,它在 dag 脚本内但不在任何运算符内。

我尝试了很多选项,例如使用 Variable.set 和 Variable.get 以及使用 Jinja 模板都没有用。有人可以告诉我如何在 dag 脚本中检索值吗?

# setting it within a method after getting the value via kwargs
Variable.set(key="ss_cd", value=ss_cd)

ss_cd = airflow.models.Variable.get('ss_cd')

这是示例JSON我正在传递:

{
"env":"qa",
"ss_cd":{"type":"sfdc","ss_cd":"sfdc"
}

如果我需要提供更多详细信息,请告诉我

我能够使用以下方法提取值:

conf = dag.get_dagrun(execution_date=dag.latest_execution_date).conf
env= conf['env']
ss_cd = dict(conf['ss_cd'])