每个操作员中的气流 DAG 娱乐
Airflow DAGs recreation in each operator
我们在 Kubernetes 上使用 Airflow 2.1.4 和 运行。
我们已经为 Web 服务器、调度程序分离了 pods,并且我们正在使用 Kubernetes 执行器。
我们正在使用各种运算符,例如 PythonOperator
、KubernetesPodOperator
等
我们的设置处理约 2000 个客户(企业),每个客户都有自己的 DAG。
我们的代码类似于:
def get_customers():
logger.info("querying database to get all customers")
return sql_connection.query(SELECT id, name, offset FROM customers_table)
customers = get_customers()
for id, name, offset in customers:
dag = DAG(
dag_id=f"{id}-{name}",
schedule_interval=offset,
)
with dag:
first = PythonOperator(..)
second = KubernetesPodOperator(..)
third = SimpleHttpOperator(..)
first >> second >> third
globals()[id] = dag
上面的代码片段是我们现有内容的简化版本,但我们在 DAG 中有几十个运算符(而不仅仅是三个)。
问题在于,对于每个 DAG 中的每个运算符,我们都会看到 querying database to get all customers
日志 - 这意味着我们查询数据库的方式比我们想要的要多。
数据库更新不频繁,DAG一天只能更新一到两次。
我知道 DAG 被保存在元数据数据库或其他东西中..
- 有没有一种方法可以只构建这些 DAG 一次/通过调度程序,而不是每个操作员都这样做?
- 我们是否应该更改设计以支持我们的多租户需求?还有比这更好的选择吗?
在我们的例子中,~60 个操作员 X ~2,000 个客户 = ~120,000 个数据库查询。
是的,这完全是预料之中的。 DAG 由 Airflow 定期解析(默认为每 30 秒),因此任何顶级代码(在解析文件期间执行的代码而不是运算符的“执行”方法)都会被执行。
简单的回答(和最佳实践)是“不要在 DAG 的顶级代码中使用任何繁重的操作”。特别是不要使用数据库查询。但是如果你想要一些更具体的答案和可能的处理方式,Airflow 文档中有关于最佳实践的专门章节:
这就是为什么顶级代码应该“轻”的解释https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#top-level-python-code
这是关于当您像在您的案例中那样进行动态 DAG 生成时可以用来避免顶级代码中的“繁重”操作的策略:https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#dynamic-dag-generation
总之提出了三种方式:
- 使用环境变量
- 通过外部脚本自动(定期)从您的数据库生成配置文件(例如 .json)并将其放在您的 DAG 旁边,并通过您的 DAG 从中读取 json 文件那里而不是使用 sql 查询。
- 动态生成许多 DAG python 文件(例如使用 JINJA)也自动和定期使用外部脚本。
我相信您可以使用 2) 或 3) 来实现您的目标。
我们在 Kubernetes 上使用 Airflow 2.1.4 和 运行。
我们已经为 Web 服务器、调度程序分离了 pods,并且我们正在使用 Kubernetes 执行器。
我们正在使用各种运算符,例如 PythonOperator
、KubernetesPodOperator
等
我们的设置处理约 2000 个客户(企业),每个客户都有自己的 DAG。
我们的代码类似于:
def get_customers():
logger.info("querying database to get all customers")
return sql_connection.query(SELECT id, name, offset FROM customers_table)
customers = get_customers()
for id, name, offset in customers:
dag = DAG(
dag_id=f"{id}-{name}",
schedule_interval=offset,
)
with dag:
first = PythonOperator(..)
second = KubernetesPodOperator(..)
third = SimpleHttpOperator(..)
first >> second >> third
globals()[id] = dag
上面的代码片段是我们现有内容的简化版本,但我们在 DAG 中有几十个运算符(而不仅仅是三个)。
问题在于,对于每个 DAG 中的每个运算符,我们都会看到 querying database to get all customers
日志 - 这意味着我们查询数据库的方式比我们想要的要多。
数据库更新不频繁,DAG一天只能更新一到两次。 我知道 DAG 被保存在元数据数据库或其他东西中..
- 有没有一种方法可以只构建这些 DAG 一次/通过调度程序,而不是每个操作员都这样做?
- 我们是否应该更改设计以支持我们的多租户需求?还有比这更好的选择吗?
在我们的例子中,~60 个操作员 X ~2,000 个客户 = ~120,000 个数据库查询。
是的,这完全是预料之中的。 DAG 由 Airflow 定期解析(默认为每 30 秒),因此任何顶级代码(在解析文件期间执行的代码而不是运算符的“执行”方法)都会被执行。
简单的回答(和最佳实践)是“不要在 DAG 的顶级代码中使用任何繁重的操作”。特别是不要使用数据库查询。但是如果你想要一些更具体的答案和可能的处理方式,Airflow 文档中有关于最佳实践的专门章节:
这就是为什么顶级代码应该“轻”的解释https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#top-level-python-code
这是关于当您像在您的案例中那样进行动态 DAG 生成时可以用来避免顶级代码中的“繁重”操作的策略:https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#dynamic-dag-generation
总之提出了三种方式:
- 使用环境变量
- 通过外部脚本自动(定期)从您的数据库生成配置文件(例如 .json)并将其放在您的 DAG 旁边,并通过您的 DAG 从中读取 json 文件那里而不是使用 sql 查询。
- 动态生成许多 DAG python 文件(例如使用 JINJA)也自动和定期使用外部脚本。
我相信您可以使用 2) 或 3) 来实现您的目标。