无法让 ECSOperator (Fargate) 与 Airflow 一起工作
Unable to get ECSOperator (Fargate) to work with Airflow
我在 Airflow 1.10.1 中通过 ECS Fargate 使用 ECSOperator 运行 任务时遇到此错误。 DAG 代码可用 here
[2019-04-15 15:57:36,960] {{models.py:1788}} ERROR - An error occurred
(InvalidParameterException) when calling the RunTask operation: Network
Configuration must be provided when networkMode 'awsvpc' is specified.
不确定那里出了什么问题,因为 network_configuration
是在 args 字典中传递的,类似于此处所做的 https://github.com/apache/airflow/blob/master/tests/contrib/operators/test_ecs_operator.py#L61
network_configuration 自 Airflow v1.10.3 以来已添加到 ESCOperator。我建议将 Airflow 版本升级到 v1.10.3.
参考:
https://github.com/apache/airflow/blob/1.10.3/airflow/contrib/operators/ecs_operator.py#L69
ECSOperator 到 运行 Fargate 的示例配置
气流版本 - v1.10.3
def get_ecs_operator_args(param):
return dict(
launch_type="FARGATE",
# The name of your task as defined in ECS
task_definition="my_automation_task",
# The name of your ECS cluster
cluster="my-cluster",
network_configuration={
'awsvpcConfiguration': {
'securityGroups': ['sg-hijk', 'sg-abcd'],
'subnets': ['subnet-lmn'],
'assignPublicIp': "ENABLED"
}
},
overrides = {
'containerOverrides': [
{
'name': "my-container",
'command': ["python", "myCode.py",
str(param)]
}
]
},
region_name="us-east-1")
ecs_args = get_ecs_operator_args("{{ dag_run.conf['name'] }}")
my_operator = ECSOperator( task_id= "task_0",**ecs_args, dag=dag)
我在 Airflow 1.10.1 中通过 ECS Fargate 使用 ECSOperator 运行 任务时遇到此错误。 DAG 代码可用 here
[2019-04-15 15:57:36,960] {{models.py:1788}} ERROR - An error occurred
(InvalidParameterException) when calling the RunTask operation: Network
Configuration must be provided when networkMode 'awsvpc' is specified.
不确定那里出了什么问题,因为 network_configuration
是在 args 字典中传递的,类似于此处所做的 https://github.com/apache/airflow/blob/master/tests/contrib/operators/test_ecs_operator.py#L61
network_configuration 自 Airflow v1.10.3 以来已添加到 ESCOperator。我建议将 Airflow 版本升级到 v1.10.3.
参考: https://github.com/apache/airflow/blob/1.10.3/airflow/contrib/operators/ecs_operator.py#L69
ECSOperator 到 运行 Fargate 的示例配置 气流版本 - v1.10.3
def get_ecs_operator_args(param):
return dict(
launch_type="FARGATE",
# The name of your task as defined in ECS
task_definition="my_automation_task",
# The name of your ECS cluster
cluster="my-cluster",
network_configuration={
'awsvpcConfiguration': {
'securityGroups': ['sg-hijk', 'sg-abcd'],
'subnets': ['subnet-lmn'],
'assignPublicIp': "ENABLED"
}
},
overrides = {
'containerOverrides': [
{
'name': "my-container",
'command': ["python", "myCode.py",
str(param)]
}
]
},
region_name="us-east-1")
ecs_args = get_ecs_operator_args("{{ dag_run.conf['name'] }}")
my_operator = ECSOperator( task_id= "task_0",**ecs_args, dag=dag)