读取 XCOM 和 Airflow 变量可能会减慢 Airflow(在 Google Cloud Composer 中)

Reading XCOM and Airflow variables probably slows down Airflow (in Google Cloud Composer)

我们正在尝试将每日 (CSV) 提取文件合并到我们的数据仓库中。

在我们的用例中,DAG 的 python 代码对于我们所有的 DAG (~2000) 都是相同的,因此我们通过 DAG 生成器逻辑从单个 python 文件。 在我们的 DAG 中,我们只有 15 个任务(5 个虚拟任务、2 个 CloudDataFusionStartPipelineOperator 任务、8 个 python 任务)。

在 DAG 生成过程中,我们读取气流变量 (~30-50) 以确定要生成的 DAG(这也确定了 DAG 的 ID 和它们应该处理的 schema/table 名称)。我们称这些为生成器变量。

在 DAG 生成过程中,DAG 还通过 ID 读取它们的配置(每个生成的 DAG 多 2-3 个 Airflow 变量)。我们称这些为配置器变量。

不幸的是,在我们的 DAG 中,我们必须处理一些 传递的参数(通过 REST API) 和任务之间的大量 动态计算信息 所以我们依赖 Airflow 的 XCOM 功能。这意味着 Airflow 数据库中的大量读取。

在可能的情况下,我们使用用户定义的宏来配置任务以延迟数据库读取的执行(XCOM 拉取的执行)直到任务被执行,但是它仍然给 Airflow (Google Cloud Composer) 带来沉重的负担。来自 XCOM 的大约 50 个拉动。

问题:


XCOM 拉取示例:

Metadata = PythonOperator(
    task_id         = TASK_NAME_PREFIX__METADATA + str(dag_id),
    python_callable = metadataManagment,
    op_kwargs       = {
        'dag_id'           : dag_id,
        'execution_case'   : '{{ ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="execution_case_for_metadata") }}',
        'date'             : '{{ ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="folder_date") }}',
        'enc_path'         : '{{ get_runtime_arg("RR", dag_run, "encryptedfilepath", ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="folder_date")) }}',
        'dec_path'         : '{{ get_runtime_arg("RR", dag_run, "decryptedfilepath", ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="folder_date")) }}',
        'aggr_project_name': ast.literal_eval(AIRFLOW_ENVIRONMENT_VARIABLES)['aggr_project_name'],
    },
    provide_context = True,
    trigger_rule    = TriggerRule.ALL_DONE
)

发电机气流变量示例:

key: STD_SCHEMA_NAMES
val: [('SCHEMA1', 'MAIN'), ('SCHEMA2', 'MAIN'), ('SCHEMA2', 'SECONDARY')]

key: STD_MAIN_SCHEMA1_INSERT_APPEND_TABLES
val: ['SCHEMA1_table_1', 'SCHEMA1_table_2', 'SCHEMA1_table_3', ... ]

key: STD_MAIN_SCHEMA1_SCD2_TABLES
val: ['SCHEMA1_table_i', 'SCHEMA1_table_j', 'SCHEMA1_table_k', ... ]

key: STD_MAIN_SCHEMA2_SCD2_TABLES
val: ['SCHEMA2_table_l', 'SCHEMA2_table_m', 'SCHEMA2_table_n', ... ]

key: STD_SECONDARY_SCHEMA2_TRUNCATE_LOAD_TABLES
val: ['SCHEMA2_table_x', 'SCHEMA2_table_y', 'SCHEMA2_table_z', ... ]

DAG 生成器示例:

# DAG_TYPE = STD
env_vars                                = Variable.get('environment_variables')

airflow_var_name__dag_typed_schema_name = '_'.join([x for x in [DAG_TYPE, 'SCHEMA_NAMES'] if x])
table_types                             = ['INSERT_APPEND', 'TRUNCATE_LOAD', 'SCD1', 'SCD2']

list_of_schemas_with_group              = ast.literal_eval(Variable.get(airflow_var_name__dag_typed_schema_name, '[]'))
tuples_of_var_names                     = [(x[0], x[1], y, '_'.join([z for z in [DAG_TYPE, x[1], x[0], y, 'TABLES'] if z])) for x in list_of_schemas_with_group for y in table_types]
list_of_tables                          = [(x[0], x[1], x[2], ast.literal_eval(Variable.get(x[3], 'None'))) for x in tuples_of_var_names]
list_of_tables                          = [(x[0], x[1], x[2], x[3]) for x in list_of_tables if x[3] and len(x[3]) > 0]


for schema_name, namespace_group, table_type, table_names_with_schema_prefix in list_of_tables:
    for table_name in table_names_with_schema_prefix:

        dag_id = str(table_name)
        globals()[dag_id] = create_dag( dag_id,
                                        schedule,
                                        default_dag_args,
                                        schema_name,
                                        table_type,
                                        env_vars,
                                        tags )

Is Airflow's Database designed for this high number of reads (of Airflow Variables and mainly values from XCOM)?

是的,但您分享的代码存在滥用行为。您正在顶级代码中使用 Variable.get()。这意味着每次解析 .py 文件时,Airflow 都会执行一个 Variable.get() 以打开与数据库的会话。假设您没有更改默认值 (min_file_process_interval),这意味着您每 30 秒对每个 DAG 执行一次 Variable.get()

用数字表示,您提到您有 2000 个 DAG,每个 DAG 进行约 30-50 Variable.get() 次调用,这意味着您每 30 秒对数据库进行 6000-10000 次调用。这太虐了。

如果您希望在顶级代码中使用变量,您应该使用环境变量而不是 Airflow 变量。 Dynamic DAGs with environment variables 文档对此进行了解释。

注意到 Airflow 提供了定义自定义 Secret Backend 的选项。

How should we redesign our code if there is a high number of dynamically calculated fields and metadata we have to pass between the tasks?

Airflow 可以处理大量数据。问题更多在于你如何编写 DAG.Should 对 Xcom table 的担忧,或者你是否更愿意将其存储在其他地方 Airflow 支持 custom Xcom backend.

Should we simply accept the fact that there is a heavy load on DB in this type of use case and simply scale the DB up vertically?

根据您的描述,您可以采取一些措施来改善这种情况。 Airflow 针对大量 dag 和任务(垂直规模和水平规模)进行了测试。如果您发现性能问题的证据,您可以通过向项目打开 Github Issue 来报告它。我