如何导出持久环境变量

How to export a persistent env variable

我想从自定义挂钩导出环境变量,并能够在下次初始化挂钩时检索该环境变量的值。

具体来说,是一个自定义的SnowflakeHook,其中我想检查一个preset(在docker-compose文件中)的env变量是否有一定的值,如果有,则导出做一些事情后的另一个环境变量。我为此创建了一个额外的方法,代码如下:

    env = os.environ['ENV']
    user = os.environ['USER'].replace('.', '_')
    if env == 'dev':
        logging.info('Development environment detected')

        dev_db_name = f'{env}_{user}'

        try:
            if os.environ['DEV_DATABASE_CREATED'] == 'True':
                logging.info('Dev database already exists')
        except KeyError:
            self.run(f"""CREATE DATABASE {dev_db_name} CLONE {self.database}""")
            os.environ['DEV_DATABASE_CREATED'] = 'True'
            logging.info(f'Dev database {dev_db_name} created')

        self.run(f"USE DATABASE {dev_db_name};")
        self.run(sql, autocommit, parameters)

此代码检查环境变量 ENV 是否为 'dev',如果是,则尝试创建新数据库并导出环境变量 DEV_DATABASE_CREATED。这里的问题是导出的变量不会持续存在。数据库已克隆,显示了日志记录 (f'Dev database {dev_db_name} created'),但下次我执行 SnowflakeHook 时,它给了我一个 KeyError,尽管它说正在尝试创建的数据库已经存在。

那么,有没有办法让 DEV_DATABASE_CREATED 坚持下去?

您可以将其存储为 Airflow 变量 (https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html) - 它们将保留在数据库中。

然而,这将适用于不“依赖于”您工作的数据间隔的任何类型的变量。通常,对于指定的数据间隔(小时、天、周等),Airflow DAG 运行s 可以是 运行,并且有多个 DAG 运行s - 每个间隔一个。如果您的值在同一 DAG 运行 的多个数据间隔中将是“相同的”,您可以使用气流变量来存储此类值。

另一方面,如果该值取决于数据间隔,则变量应存储为 XCom:https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html。其中一个任务(通常是第一个)应该生成 xcom 的值,其他任务应该从 XCom 读取它。这种方法的优点是它是“幂等的”——因为每个间隔可以有不同的 Xcom 值,所以你可以重新 运行 过去的数据间隔而不影响其他间隔,因为每个数据间隔都有它自己的“ space" 要使用和操作的值。