没有工人的 MWAA
MWAA without workers
我目前对MWAA的理解
MWAA 将 Fargate 用于调度程序和工作程序。 MWAA 必须至少有 1 个调度程序,它必须是 Fargate,我假设 24/7 无休止的过程(以避免退役)。调度器 Fargate 机器也是 运行 Celery 执行器。
当被触发时,DAG 被 Executor 拆分成任务,每个任务被添加到队列中,由 Workers 从队列中提取。如果是 bash 运算符,任务将提交给 Fargate Worker 并在那里执行。
Workers 扩展由 Fargate 管理,您只需指定 min/max 计数。如果同时提交任务,则触发工作人员退役 - 它可能会失败(已知问题)。
问题
假设执行单任务DAG。任务在另一个 AWS 服务上执行,即 EMR (EmrAddStepsOperator
).
-
EmrAddStepsOperator
是否先提交给 MWAA Fargate Worker 然后再提交给 EMR?还是直接从 MWAA Scheduler 提交给 EMR?
- 如果我的所有任务都在 EMR 执行,我是否需要 MWAA 工作人员?
无论 MWAA / Google Cloud Composer / 其他什么,答案都是一样的。
Airflow 是协调器工具。任务 运行ning 在 Airflow workers 上,但任务的“核心”可以在另一个服务上执行。考虑执行一些 SQL 任务的用例。 SQL 的实际计算是在数据库上完成的,而不是在提交 SQL 作业的机器上完成的。 Airflow 必须为每个操作员创建一个任务,此任务必须 运行 在 Airflow worker 上。如果任务在 worker 本身上执行 computing/processing 或将作业提交给另一个服务并等待取决于任务本身的响应。
为了更好地解释:
execute()
任何操作员都必须实现的功能是 运行ning 在您的 Celery worker 上。在此函数中,可以有部分代码用于将作业提交给其他服务(如 EMR)。在这些情况下,您还可以选择是否要保留 Celery 工作人员直到外部服务 (EMR) returns 回答(同步方式)或释放 Celery 工作人员同时做其他事情(运行另一个任务)。这取决于运算符的实现方式。
所以回答你的问题:
当 Airflow 调度程序执行 EmrAddStepsOperator
时,将为它创建一个任务。该任务将由 CeleryExecutor
处理并发送给 Celery worker。当任务开始 运行ning 时,它将执行 add_job_flow_steps,然后才会将一个步骤提交给 EMR。
是的,因为 CeleryExecutor
向 Celery workers 提交任务。它不会 know/care 关于您的任务正在与哪些服务交互。就是说 - 如果您的任务只是将工作提交给其他服务,您可能不需要高 memory/cpu 的工作人员,因为工作人员不做重要的处理。
我目前对MWAA的理解
MWAA 将 Fargate 用于调度程序和工作程序。 MWAA 必须至少有 1 个调度程序,它必须是 Fargate,我假设 24/7 无休止的过程(以避免退役)。调度器 Fargate 机器也是 运行 Celery 执行器。
当被触发时,DAG 被 Executor 拆分成任务,每个任务被添加到队列中,由 Workers 从队列中提取。如果是 bash 运算符,任务将提交给 Fargate Worker 并在那里执行。
Workers 扩展由 Fargate 管理,您只需指定 min/max 计数。如果同时提交任务,则触发工作人员退役 - 它可能会失败(已知问题)。
问题
假设执行单任务DAG。任务在另一个 AWS 服务上执行,即 EMR (EmrAddStepsOperator
).
-
EmrAddStepsOperator
是否先提交给 MWAA Fargate Worker 然后再提交给 EMR?还是直接从 MWAA Scheduler 提交给 EMR? - 如果我的所有任务都在 EMR 执行,我是否需要 MWAA 工作人员?
无论 MWAA / Google Cloud Composer / 其他什么,答案都是一样的。
Airflow 是协调器工具。任务 运行ning 在 Airflow workers 上,但任务的“核心”可以在另一个服务上执行。考虑执行一些 SQL 任务的用例。 SQL 的实际计算是在数据库上完成的,而不是在提交 SQL 作业的机器上完成的。 Airflow 必须为每个操作员创建一个任务,此任务必须 运行 在 Airflow worker 上。如果任务在 worker 本身上执行 computing/processing 或将作业提交给另一个服务并等待取决于任务本身的响应。
为了更好地解释:
execute()
任何操作员都必须实现的功能是 运行ning 在您的 Celery worker 上。在此函数中,可以有部分代码用于将作业提交给其他服务(如 EMR)。在这些情况下,您还可以选择是否要保留 Celery 工作人员直到外部服务 (EMR) returns 回答(同步方式)或释放 Celery 工作人员同时做其他事情(运行另一个任务)。这取决于运算符的实现方式。
所以回答你的问题:
当 Airflow 调度程序执行
EmrAddStepsOperator
时,将为它创建一个任务。该任务将由CeleryExecutor
处理并发送给 Celery worker。当任务开始 运行ning 时,它将执行 add_job_flow_steps,然后才会将一个步骤提交给 EMR。是的,因为
CeleryExecutor
向 Celery workers 提交任务。它不会 know/care 关于您的任务正在与哪些服务交互。就是说 - 如果您的任务只是将工作提交给其他服务,您可能不需要高 memory/cpu 的工作人员,因为工作人员不做重要的处理。