apache Apache Airflow/Cloud Composer 中多个客户端的最佳实践?

Best practices for multiple clients in apache Apache Airflow/Cloud Composer?

问题:

Airflow 中是否有任何让事情变得简单的最佳实践?我正在考虑(排名不分先后):

我找不到很多关于这个特定用例的 material。

理想情况下,我们有一个 "template" 每个客户都可以重复使用。目前尚不清楚一份工作还是多项工作是最好的解决方案。或者也许有另一种方法可以更好地使用这种用法?ui

Airflow 广泛支持 Google 云平台。但请注意,大多数 Hooks 和 Operators 都在 contrib 部分,这意味着它们处于测试状态,这意味着它们可以在次要版本之间进行重大更改。

客户端方面数:

DAG 可以有任意多个,每个 DAG 可以提及多个任务。建议在一个 DAG 文件中保留一个逻辑工作流,并尽量保持它非常轻(例如配置文件)。它允许 Airflow 调度程序在每次心跳时花费更少的时间和资源来处理它们。

可以根据任意数量的配置参数动态创建 DAG(具有相同的基本代码),这在有很多客户端时非常有用且省时。

要创建新的 DAG,请在 create_dag 函数中创建一个 DAG 模板。代码可以包装在允许传入自定义参数的方法中。此外,输入参数不必存在于 dag 文件本身中。生成 DAG 的另一种常见形式是在 Variable 对象中设置值。请参阅 here 了解更多信息。

特定客户端配置:

您可以使用宏用于在 运行 时将动态信息传递到任务实例中。可在 here.

中找到可在所有模板中访问的默认变量列表

Airflow 对 Jinja templating 的内置支持使用户能够传递可在模板化字段中使用的参数。

UI 概述

如果您的 dag 加载时间较长,您可以将 airflow.cfg 中的 default_dag_run_display_number 配置值减小到较小的值。此可配置项控制 UI 中显示的 dag 运行 的数量,默认值为 25。

模块化

如果将 default_args 的字典传递给 DAG,它会将它们应用于其任何运算符。这使得将通用参数应用于许多运算符变得容易,而无需多次键入它。

举个例子:

from datetime import datetime, timedelta

default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('my_dag', default_args=default_args)
op = DummyOperator(task_id='dummy', dag=dag)
print(op.owner) # Airflow

有关 BaseOperator 的参数及其作用的更多信息,请参阅 airflow.models.BaseOperator 文档。

性能

可以使用您可以控制的变量来提高气流 DAG 性能(可以在 airflow.cfg 中设置):

  • parallelism:控制在整个 Airflow 集群中同时 运行 的任务实例数。
  • concurrency:Airflow 调度程序将 运行 在任何给定时间都不会超过 DAG 的并发任务实例。并发性在您的 Airflow DAG 中定义。如果您没有在 DAG 上设置并发性,调度程序将使用 airflow.cfg.
  • 中 dag_concurrency 条目的默认值
  • task_concurrency:此变量控制每个任务 dag_runs 中并发 运行ning 任务实例的数量。
  • max_active_runs:Airflow 调度程序将 运行 在给定时间不超过 max_active_runs DAG 的 DagRuns。
  • pool:此变量控制分配给池的并发 运行ning 任务实例数。

您可以在 composer 实例存储桶中看到气流配置 gs://composer_instance_bucket/airflow.cfg。您可以根据需要调整此配置,但请记住,cloud composer 已阻止某些配置。

缩放

请记住,建议节点数必须大于 3,将此数字保持在 3 以下可能会导致一些问题,如果您想向上升级节点数,可以使用 gcloud command to specify this value. Also please note that, there are some airflow configurations related to autoscalling blocked and can't be overridden。 一些 Airflow 配置是为 Cloud Composer 预先配置的,您无法更改它们。

容错

请参考以下documentation.

重新执行

就像对象是 class 的实例一样,Airflow 任务是 Operator (BaseOperator) 的实例。因此,编写一个 "re-usable" 运算符并通过传递不同的参数在您的管道中使用它数百次。

延迟

可以通过以下方式减少生产中的气流 DAG 调度延迟:

  • max_threads:调度程序将并行生成多个线程来调度 dag。这由 max_threads 控制,默认值为 2。
  • scheduler_heartbeat_sec:用户应考虑将 scheduler_heartbeat_sec 配置增加到更高的值(例如 60 秒),该值控制气流调度程序获取心跳并更新数据库中作业条目的频率。

请参考以下有关最佳实践的文章:

希望对您有所帮助。