如何在触发 Airflow dag 之前检索通过 JSON 解析的值?
How can I retrieve a value that is being parsed via JSON before triggering Airflow dag?
我有一个气流 dag,我在触发它之前将 JSON 解析为输入。我能够通过 **kwargs 检索值并将它们推送到 xcom 以及 PythonOperator 方法中。
def push_to_xcom(ds, **kwargs):
shape_change_tables = []
ss_cd = {}
env = ''
if 'env' in kwargs['dag_run'].conf:
env = kwargs['dag_run'].conf['env']
else:
env = 'dev'
print("by default environment take as 'dev'")
if isinstance(kwargs['dag_run'].conf['ss_cd'], dict):
ss_cd = dict(kwargs['dag_run'].conf['ss_cd'])
else:
print('<<<<<<<<<<Pass sscd as an argument>>>>>>>>>>>')
sys.exit(-1)
但我无法做的是检索 PythonOperator 之外的值,它在 dag 脚本内但不在任何运算符内。
我尝试了很多选项,例如使用 Variable.set 和 Variable.get 以及使用 Jinja 模板都没有用。有人可以告诉我如何在 dag 脚本中检索值吗?
# setting it within a method after getting the value via kwargs
Variable.set(key="ss_cd", value=ss_cd)
ss_cd = airflow.models.Variable.get('ss_cd')
这是示例JSON我正在传递:
{
"env":"qa",
"ss_cd":{"type":"sfdc","ss_cd":"sfdc"
}
如果我需要提供更多详细信息,请告诉我
我能够使用以下方法提取值:
conf = dag.get_dagrun(execution_date=dag.latest_execution_date).conf
env= conf['env']
ss_cd = dict(conf['ss_cd'])
我有一个气流 dag,我在触发它之前将 JSON 解析为输入。我能够通过 **kwargs 检索值并将它们推送到 xcom 以及 PythonOperator 方法中。
def push_to_xcom(ds, **kwargs):
shape_change_tables = []
ss_cd = {}
env = ''
if 'env' in kwargs['dag_run'].conf:
env = kwargs['dag_run'].conf['env']
else:
env = 'dev'
print("by default environment take as 'dev'")
if isinstance(kwargs['dag_run'].conf['ss_cd'], dict):
ss_cd = dict(kwargs['dag_run'].conf['ss_cd'])
else:
print('<<<<<<<<<<Pass sscd as an argument>>>>>>>>>>>')
sys.exit(-1)
但我无法做的是检索 PythonOperator 之外的值,它在 dag 脚本内但不在任何运算符内。
我尝试了很多选项,例如使用 Variable.set 和 Variable.get 以及使用 Jinja 模板都没有用。有人可以告诉我如何在 dag 脚本中检索值吗?
# setting it within a method after getting the value via kwargs
Variable.set(key="ss_cd", value=ss_cd)
ss_cd = airflow.models.Variable.get('ss_cd')
这是示例JSON我正在传递:
{
"env":"qa",
"ss_cd":{"type":"sfdc","ss_cd":"sfdc"
}
如果我需要提供更多详细信息,请告诉我
我能够使用以下方法提取值:
conf = dag.get_dagrun(execution_date=dag.latest_execution_date).conf
env= conf['env']
ss_cd = dict(conf['ss_cd'])