如何从 python 中的气流变量中过滤
How to filter from airflow variables in python
我在 Airflow 中有一些变量,我试图在 python 列表中调用这些变量,但我无法那样做。如果我使用这些值,它的工作原理,但我有大列表,所以我需要使用气流变量
此代码运行良好
filtered_data = [x for x in data if x['id_job'] in [14,67]]
但是,我正在尝试这个,但没有用
id_job=get_vars['id_job']
filtered_data = [x for x in data if x['id_job'] = Variable.get('id_job')]
我在 Airflow 变量中创建了这个变量 'id_job' 并具有各自的值。
当我从 get_vars 打印 id_job 时,我可以看到我的值列表,但是当我在列表内部调用时却不起作用。
有人知道我该如何解决吗?
每次调用 Variable.get
时,您都在向后端数据库发出请求。将变量保存为 JSON 是一个很好的做法,这样您就可以调用一次整个 JSON 文件,然后您可以对所有变量做任何您想做的事情,从而节省数据库很多压力。
为此,您应该有一个如下所示的文件(例如,名为 variables_json.json):
{
"variables_secret": {
"job_id_61": "61",
"s3_airflow": "airflow"
}
}
然后,为 Airflow 1.10 版本导入它是:
airflow variables -i variables_secret.json
或者如果气流 > 2.0:
airflow variables import variables_secret.json
之后,python代码为:
variables = Variable.get('variables_secret', deserialize_json=True)
最后,您将可以访问所有变量,例如:
variables['id_job_61']
有了字典,你可以做任何你需要的检查
我已经试过了
我的旧代码:
get_vars = Variable.get('my_variables', default_var='', deserialize_json=True)
filtered_data = [x for x in data if x['id_job'] = get_vars['id_job']]
当我这样做时,我收到 Broken DAG: invalid syntax(第 2 行)。
我认为原因是因为它在列表中,但我不知道如何修复它
对于 Python 中的相等表达式,您需要两个等号;单个等号是赋值运算符。对第 2 行试试这个:
filtered_data = [x for x in data if x['id_job'] == get_vars['id_job']]
我在 Airflow 中有一些变量,我试图在 python 列表中调用这些变量,但我无法那样做。如果我使用这些值,它的工作原理,但我有大列表,所以我需要使用气流变量
此代码运行良好
filtered_data = [x for x in data if x['id_job'] in [14,67]]
但是,我正在尝试这个,但没有用
id_job=get_vars['id_job']
filtered_data = [x for x in data if x['id_job'] = Variable.get('id_job')]
我在 Airflow 变量中创建了这个变量 'id_job' 并具有各自的值。 当我从 get_vars 打印 id_job 时,我可以看到我的值列表,但是当我在列表内部调用时却不起作用。 有人知道我该如何解决吗?
每次调用 Variable.get
时,您都在向后端数据库发出请求。将变量保存为 JSON 是一个很好的做法,这样您就可以调用一次整个 JSON 文件,然后您可以对所有变量做任何您想做的事情,从而节省数据库很多压力。
为此,您应该有一个如下所示的文件(例如,名为 variables_json.json):
{
"variables_secret": {
"job_id_61": "61",
"s3_airflow": "airflow"
}
}
然后,为 Airflow 1.10 版本导入它是:
airflow variables -i variables_secret.json
或者如果气流 > 2.0:
airflow variables import variables_secret.json
之后,python代码为:
variables = Variable.get('variables_secret', deserialize_json=True)
最后,您将可以访问所有变量,例如:
variables['id_job_61']
有了字典,你可以做任何你需要的检查
我已经试过了
我的旧代码:
get_vars = Variable.get('my_variables', default_var='', deserialize_json=True)
filtered_data = [x for x in data if x['id_job'] = get_vars['id_job']]
当我这样做时,我收到 Broken DAG: invalid syntax(第 2 行)。 我认为原因是因为它在列表中,但我不知道如何修复它
对于 Python 中的相等表达式,您需要两个等号;单个等号是赋值运算符。对第 2 行试试这个:
filtered_data = [x for x in data if x['id_job'] == get_vars['id_job']]