每个操作员中的气流 DAG 娱乐

Airflow DAGs recreation in each operator

我们在 Kubernetes 上使用 Airflow 2.1.4 和 运行。

我们已经为 Web 服务器、调度程序分离了 pods,并且我们正在使用 Kubernetes 执行器

我们正在使用各种运算符,例如 PythonOperatorKubernetesPodOperator

我们的设置处理约 2000 个客户(企业),每个客户都有自己的 DAG。

我们的代码类似于:

def get_customers():
    logger.info("querying database to get all customers")
    return sql_connection.query(SELECT id, name, offset FROM customers_table)


customers = get_customers()
for id, name, offset in customers:
    dag = DAG(
        dag_id=f"{id}-{name}",
        schedule_interval=offset,
    )
    with dag:
        first = PythonOperator(..)
        second = KubernetesPodOperator(..)
        third = SimpleHttpOperator(..)
        first >> second >> third

    globals()[id] = dag

上面的代码片段是我们现有内容的简化版本,但我们在 DAG 中有几十个运算符(而不仅仅是三个)。

问题在于,对于每个 DAG 中的每个运算符,我们都会看到 querying database to get all customers 日志 - 这意味着我们查询数据库的方式比我们想要的要多。

数据库更新不频繁,DAG一天只能更新一到两次。 我知道 DAG 被保存在元数据数据库或其他东西中..

在我们的例子中,~60 个操作员 X ~2,000 个客户 = ~120,000 个数据库查询。

是的,这完全是预料之中的。 DAG 由 Airflow 定期解析(默认为每 30 秒),因此任何顶级代码(在解析文件期间执行的代码而不是运算符的“执行”方法)都会被执行。

简单的回答(和最佳实践)是“不要在 DAG 的顶级代码中使用任何繁重的操作”。特别是不要使用数据库查询。但是如果你想要一些更具体的答案和可能的处理方式,Airflow 文档中有关于最佳实践的专门章节:

总之提出了三种方式:

  1. 使用环境变量
  2. 通过外部脚本自动(定期)从您的数据库生成配置文件(例如 .json)并将其放在您的 DAG 旁边,并通过您的 DAG 从中读取 json 文件那里而不是使用 sql 查询。
  3. 动态生成许多 DAG python 文件(例如使用 JINJA)也自动和定期使用外部脚本。

我相信您可以使用 2) 或 3) 来实现您的目标。