气流 |设置变量
Airflow | Set Variable
有谁知道从 Airflow 中的 DAG 更新变量的语法,但采用 JSON 格式?
现在我有这个:
Variable.set(f"update_{kwargs['table_id']}", *last_update)
这会将变量更新为:
updated_giftcard_id 0
updated_order_id 0
但这会创建不可扩展的新变量。
理想情况下,我想更新同一个变量并以 JSON 格式传递它们:
EG: "last_ids": {"updated_giftcard_id:" "0", "updated_order_id:" "1", etc}"
我尝试传递一些参数,例如 Variable.set(key="updated_giftcard_id", value="0", serialize=True)
,但无法正常工作,因为它抱怨我传递了太多参数。
提前致谢!
Airflow 中的变量始终是字符串。您最多可以手动将它们序列化为字符串,然后在读回它们时反序列化。
写作:
Variable.set(key="updated_giftcard_id", value=json.dumps(dictionary))
阅读中:
dict = json.loads(Variable.get(key="updated_giftcard_id")
Variable.set()
和 Variable.get()
方法确实有一个 serialize/deserialize 参数,分别是 serialize_json
和 deserialize_json
,用于本地处理 JSON -类型变量。
你可以从这样的事情中推断出来:
import logging
from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from datetime import datetime
@task
def variable_set():
Variable.set(key="updated_giftcard_id", value="0", serialize_json=True)
Variable.set(
key="last_ids", value={"updated_giftcard_id": "0", "updated_order_id": "1"}, serialize_json=True
)
@task
def variable_get():
logging.info(Variable.get(key="updated_giftcard_id", deserialize_json=True))
logging.info(Variable.get(key="last_ids", deserialize_json=True))
with DAG(
"example",
start_date=datetime(2021, 8, 13),
schedule_interval=None,
catchup=False,
) as dag:
variable_set() >> variable_get()
“variable_set”任务的结果:
“variable_get”任务的结果:
有谁知道从 Airflow 中的 DAG 更新变量的语法,但采用 JSON 格式?
现在我有这个:
Variable.set(f"update_{kwargs['table_id']}", *last_update)
这会将变量更新为:
updated_giftcard_id 0
updated_order_id 0
但这会创建不可扩展的新变量。
理想情况下,我想更新同一个变量并以 JSON 格式传递它们:
EG: "last_ids": {"updated_giftcard_id:" "0", "updated_order_id:" "1", etc}"
我尝试传递一些参数,例如 Variable.set(key="updated_giftcard_id", value="0", serialize=True)
,但无法正常工作,因为它抱怨我传递了太多参数。
提前致谢!
Airflow 中的变量始终是字符串。您最多可以手动将它们序列化为字符串,然后在读回它们时反序列化。
写作:
Variable.set(key="updated_giftcard_id", value=json.dumps(dictionary))
阅读中:
dict = json.loads(Variable.get(key="updated_giftcard_id")
Variable.set()
和 Variable.get()
方法确实有一个 serialize/deserialize 参数,分别是 serialize_json
和 deserialize_json
,用于本地处理 JSON -类型变量。
你可以从这样的事情中推断出来:
import logging
from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from datetime import datetime
@task
def variable_set():
Variable.set(key="updated_giftcard_id", value="0", serialize_json=True)
Variable.set(
key="last_ids", value={"updated_giftcard_id": "0", "updated_order_id": "1"}, serialize_json=True
)
@task
def variable_get():
logging.info(Variable.get(key="updated_giftcard_id", deserialize_json=True))
logging.info(Variable.get(key="last_ids", deserialize_json=True))
with DAG(
"example",
start_date=datetime(2021, 8, 13),
schedule_interval=None,
catchup=False,
) as dag:
variable_set() >> variable_get()
“variable_set”任务的结果: