读取 XCOM 和 Airflow 变量可能会减慢 Airflow(在 Google Cloud Composer 中)
Reading XCOM and Airflow variables probably slows down Airflow (in Google Cloud Composer)
我们正在尝试将每日 (CSV) 提取文件合并到我们的数据仓库中。
在我们的用例中,DAG 的 python 代码对于我们所有的 DAG (~2000) 都是相同的,因此我们通过 DAG 生成器逻辑从单个 python 文件。
在我们的 DAG 中,我们只有 15 个任务(5 个虚拟任务、2 个 CloudDataFusionStartPipelineOperator 任务、8 个 python 任务)。
在 DAG 生成过程中,我们读取气流变量 (~30-50) 以确定要生成的 DAG(这也确定了 DAG 的 ID 和它们应该处理的 schema/table 名称)。我们称这些为生成器变量。
在 DAG 生成过程中,DAG 还通过 ID 读取它们的配置(每个生成的 DAG 多 2-3 个 Airflow 变量)。我们称这些为配置器变量。
不幸的是,在我们的 DAG 中,我们必须处理一些 传递的参数(通过 REST API) 和任务之间的大量 动态计算信息 所以我们依赖 Airflow 的 XCOM 功能。这意味着 Airflow 数据库中的大量读取。
在可能的情况下,我们使用用户定义的宏来配置任务以延迟数据库读取的执行(XCOM 拉取的执行)直到任务被执行,但是它仍然给 Airflow (Google Cloud Composer) 带来沉重的负担。来自 XCOM 的大约 50 个拉动。
问题:
- Airflow 的数据库是否专为这种大量读取(Airflow 变量和主要来自 XCOM 的值)而设计?
- 如果我们必须在任务之间传递大量动态计算的字段和元数据,我们应该如何重新设计代码?
- 我们是否应该简单地接受这样一个事实,即在这种类型的用例中数据库负载很重,并简单地垂直扩展数据库?
XCOM 拉取示例:
Metadata = PythonOperator(
task_id = TASK_NAME_PREFIX__METADATA + str(dag_id),
python_callable = metadataManagment,
op_kwargs = {
'dag_id' : dag_id,
'execution_case' : '{{ ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="execution_case_for_metadata") }}',
'date' : '{{ ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="folder_date") }}',
'enc_path' : '{{ get_runtime_arg("RR", dag_run, "encryptedfilepath", ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="folder_date")) }}',
'dec_path' : '{{ get_runtime_arg("RR", dag_run, "decryptedfilepath", ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="folder_date")) }}',
'aggr_project_name': ast.literal_eval(AIRFLOW_ENVIRONMENT_VARIABLES)['aggr_project_name'],
},
provide_context = True,
trigger_rule = TriggerRule.ALL_DONE
)
发电机气流变量示例:
key: STD_SCHEMA_NAMES
val: [('SCHEMA1', 'MAIN'), ('SCHEMA2', 'MAIN'), ('SCHEMA2', 'SECONDARY')]
key: STD_MAIN_SCHEMA1_INSERT_APPEND_TABLES
val: ['SCHEMA1_table_1', 'SCHEMA1_table_2', 'SCHEMA1_table_3', ... ]
key: STD_MAIN_SCHEMA1_SCD2_TABLES
val: ['SCHEMA1_table_i', 'SCHEMA1_table_j', 'SCHEMA1_table_k', ... ]
key: STD_MAIN_SCHEMA2_SCD2_TABLES
val: ['SCHEMA2_table_l', 'SCHEMA2_table_m', 'SCHEMA2_table_n', ... ]
key: STD_SECONDARY_SCHEMA2_TRUNCATE_LOAD_TABLES
val: ['SCHEMA2_table_x', 'SCHEMA2_table_y', 'SCHEMA2_table_z', ... ]
DAG 生成器示例:
# DAG_TYPE = STD
env_vars = Variable.get('environment_variables')
airflow_var_name__dag_typed_schema_name = '_'.join([x for x in [DAG_TYPE, 'SCHEMA_NAMES'] if x])
table_types = ['INSERT_APPEND', 'TRUNCATE_LOAD', 'SCD1', 'SCD2']
list_of_schemas_with_group = ast.literal_eval(Variable.get(airflow_var_name__dag_typed_schema_name, '[]'))
tuples_of_var_names = [(x[0], x[1], y, '_'.join([z for z in [DAG_TYPE, x[1], x[0], y, 'TABLES'] if z])) for x in list_of_schemas_with_group for y in table_types]
list_of_tables = [(x[0], x[1], x[2], ast.literal_eval(Variable.get(x[3], 'None'))) for x in tuples_of_var_names]
list_of_tables = [(x[0], x[1], x[2], x[3]) for x in list_of_tables if x[3] and len(x[3]) > 0]
for schema_name, namespace_group, table_type, table_names_with_schema_prefix in list_of_tables:
for table_name in table_names_with_schema_prefix:
dag_id = str(table_name)
globals()[dag_id] = create_dag( dag_id,
schedule,
default_dag_args,
schema_name,
table_type,
env_vars,
tags )
Is Airflow's Database designed for this high number of reads (of Airflow Variables and mainly values from XCOM)?
是的,但您分享的代码存在滥用行为。您正在顶级代码中使用 Variable.get()
。这意味着每次解析 .py
文件时,Airflow 都会执行一个 Variable.get()
以打开与数据库的会话。假设您没有更改默认值 (min_file_process_interval),这意味着您每 30 秒对每个 DAG 执行一次 Variable.get()
。
用数字表示,您提到您有 2000 个 DAG,每个 DAG 进行约 30-50 Variable.get()
次调用,这意味着您每 30 秒对数据库进行 6000-10000 次调用。这太虐了。
如果您希望在顶级代码中使用变量,您应该使用环境变量而不是 Airflow 变量。 Dynamic DAGs with environment variables 文档对此进行了解释。
注意到 Airflow 提供了定义自定义 Secret Backend 的选项。
How should we redesign our code if there is a high number of dynamically calculated fields and metadata we have to pass between the tasks?
Airflow 可以处理大量数据。问题更多在于你如何编写 DAG.Should 对 Xcom table 的担忧,或者你是否更愿意将其存储在其他地方 Airflow 支持 custom Xcom backend.
Should we simply accept the fact that there is a heavy load on DB in this type of use case and simply scale the DB up vertically?
根据您的描述,您可以采取一些措施来改善这种情况。 Airflow 针对大量 dag 和任务(垂直规模和水平规模)进行了测试。如果您发现性能问题的证据,您可以通过向项目打开 Github Issue 来报告它。我
我们正在尝试将每日 (CSV) 提取文件合并到我们的数据仓库中。
在我们的用例中,DAG 的 python 代码对于我们所有的 DAG (~2000) 都是相同的,因此我们通过 DAG 生成器逻辑从单个 python 文件。 在我们的 DAG 中,我们只有 15 个任务(5 个虚拟任务、2 个 CloudDataFusionStartPipelineOperator 任务、8 个 python 任务)。
在 DAG 生成过程中,我们读取气流变量 (~30-50) 以确定要生成的 DAG(这也确定了 DAG 的 ID 和它们应该处理的 schema/table 名称)。我们称这些为生成器变量。
在 DAG 生成过程中,DAG 还通过 ID 读取它们的配置(每个生成的 DAG 多 2-3 个 Airflow 变量)。我们称这些为配置器变量。
不幸的是,在我们的 DAG 中,我们必须处理一些 传递的参数(通过 REST API) 和任务之间的大量 动态计算信息 所以我们依赖 Airflow 的 XCOM 功能。这意味着 Airflow 数据库中的大量读取。
在可能的情况下,我们使用用户定义的宏来配置任务以延迟数据库读取的执行(XCOM 拉取的执行)直到任务被执行,但是它仍然给 Airflow (Google Cloud Composer) 带来沉重的负担。来自 XCOM 的大约 50 个拉动。
问题:
- Airflow 的数据库是否专为这种大量读取(Airflow 变量和主要来自 XCOM 的值)而设计?
- 如果我们必须在任务之间传递大量动态计算的字段和元数据,我们应该如何重新设计代码?
- 我们是否应该简单地接受这样一个事实,即在这种类型的用例中数据库负载很重,并简单地垂直扩展数据库?
XCOM 拉取示例:
Metadata = PythonOperator(
task_id = TASK_NAME_PREFIX__METADATA + str(dag_id),
python_callable = metadataManagment,
op_kwargs = {
'dag_id' : dag_id,
'execution_case' : '{{ ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="execution_case_for_metadata") }}',
'date' : '{{ ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="folder_date") }}',
'enc_path' : '{{ get_runtime_arg("RR", dag_run, "encryptedfilepath", ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="folder_date")) }}',
'dec_path' : '{{ get_runtime_arg("RR", dag_run, "decryptedfilepath", ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="folder_date")) }}',
'aggr_project_name': ast.literal_eval(AIRFLOW_ENVIRONMENT_VARIABLES)['aggr_project_name'],
},
provide_context = True,
trigger_rule = TriggerRule.ALL_DONE
)
发电机气流变量示例:
key: STD_SCHEMA_NAMES
val: [('SCHEMA1', 'MAIN'), ('SCHEMA2', 'MAIN'), ('SCHEMA2', 'SECONDARY')]
key: STD_MAIN_SCHEMA1_INSERT_APPEND_TABLES
val: ['SCHEMA1_table_1', 'SCHEMA1_table_2', 'SCHEMA1_table_3', ... ]
key: STD_MAIN_SCHEMA1_SCD2_TABLES
val: ['SCHEMA1_table_i', 'SCHEMA1_table_j', 'SCHEMA1_table_k', ... ]
key: STD_MAIN_SCHEMA2_SCD2_TABLES
val: ['SCHEMA2_table_l', 'SCHEMA2_table_m', 'SCHEMA2_table_n', ... ]
key: STD_SECONDARY_SCHEMA2_TRUNCATE_LOAD_TABLES
val: ['SCHEMA2_table_x', 'SCHEMA2_table_y', 'SCHEMA2_table_z', ... ]
DAG 生成器示例:
# DAG_TYPE = STD
env_vars = Variable.get('environment_variables')
airflow_var_name__dag_typed_schema_name = '_'.join([x for x in [DAG_TYPE, 'SCHEMA_NAMES'] if x])
table_types = ['INSERT_APPEND', 'TRUNCATE_LOAD', 'SCD1', 'SCD2']
list_of_schemas_with_group = ast.literal_eval(Variable.get(airflow_var_name__dag_typed_schema_name, '[]'))
tuples_of_var_names = [(x[0], x[1], y, '_'.join([z for z in [DAG_TYPE, x[1], x[0], y, 'TABLES'] if z])) for x in list_of_schemas_with_group for y in table_types]
list_of_tables = [(x[0], x[1], x[2], ast.literal_eval(Variable.get(x[3], 'None'))) for x in tuples_of_var_names]
list_of_tables = [(x[0], x[1], x[2], x[3]) for x in list_of_tables if x[3] and len(x[3]) > 0]
for schema_name, namespace_group, table_type, table_names_with_schema_prefix in list_of_tables:
for table_name in table_names_with_schema_prefix:
dag_id = str(table_name)
globals()[dag_id] = create_dag( dag_id,
schedule,
default_dag_args,
schema_name,
table_type,
env_vars,
tags )
Is Airflow's Database designed for this high number of reads (of Airflow Variables and mainly values from XCOM)?
是的,但您分享的代码存在滥用行为。您正在顶级代码中使用 Variable.get()
。这意味着每次解析 .py
文件时,Airflow 都会执行一个 Variable.get()
以打开与数据库的会话。假设您没有更改默认值 (min_file_process_interval),这意味着您每 30 秒对每个 DAG 执行一次 Variable.get()
。
用数字表示,您提到您有 2000 个 DAG,每个 DAG 进行约 30-50 Variable.get()
次调用,这意味着您每 30 秒对数据库进行 6000-10000 次调用。这太虐了。
如果您希望在顶级代码中使用变量,您应该使用环境变量而不是 Airflow 变量。 Dynamic DAGs with environment variables 文档对此进行了解释。
注意到 Airflow 提供了定义自定义 Secret Backend 的选项。
How should we redesign our code if there is a high number of dynamically calculated fields and metadata we have to pass between the tasks?
Airflow 可以处理大量数据。问题更多在于你如何编写 DAG.Should 对 Xcom table 的担忧,或者你是否更愿意将其存储在其他地方 Airflow 支持 custom Xcom backend.
Should we simply accept the fact that there is a heavy load on DB in this type of use case and simply scale the DB up vertically?
根据您的描述,您可以采取一些措施来改善这种情况。 Airflow 针对大量 dag 和任务(垂直规模和水平规模)进行了测试。如果您发现性能问题的证据,您可以通过向项目打开 Github Issue 来报告它。我