气流 |设置变量

Airflow | Set Variable

有谁知道从 Airflow 中的 DAG 更新变量的语法,但采用 JSON 格式?

现在我有这个:

Variable.set(f"update_{kwargs['table_id']}", *last_update)

这会将变量更新为:

但这会创建不可扩展的新变量。

理想情况下,我想更新同一个变量并以 JSON 格式传递它们:

EG: "last_ids": {"updated_giftcard_id:" "0", "updated_order_id:" "1", etc}"

我尝试传递一些参数,例如 Variable.set(key="updated_giftcard_id", value="0", serialize=True),但无法正常工作,因为它抱怨我传递了太多参数。

提前致谢!

Airflow 中的变量始终是字符串。您最多可以手动将它们序列化为字符串,然后在读回它们时反序列化。

写作:

Variable.set(key="updated_giftcard_id", value=json.dumps(dictionary))

阅读中:

dict = json.loads(Variable.get(key="updated_giftcard_id")

Variable.set()Variable.get() 方法确实有一个 serialize/deserialize 参数,分别是 serialize_jsondeserialize_json,用于本地处理 JSON -类型变量。

你可以从这样的事情中推断出来:

import logging
from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from datetime import datetime


@task
def variable_set():
    Variable.set(key="updated_giftcard_id", value="0", serialize_json=True)
    Variable.set(
        key="last_ids", value={"updated_giftcard_id": "0", "updated_order_id": "1"}, serialize_json=True
    )

@task
def variable_get():
    logging.info(Variable.get(key="updated_giftcard_id", deserialize_json=True))
    logging.info(Variable.get(key="last_ids", deserialize_json=True))

with DAG(
    "example",
    start_date=datetime(2021, 8, 13),
    schedule_interval=None,
    catchup=False,
) as dag:
    variable_set() >> variable_get()

“variable_set”任务的结果: “variable_get”任务的结果: