MWAA 未找到 aws_default 连接

MWAA not finding aws_default connection

我刚刚设置了 AWS MWAA(托管气流),并且正在使用 运行 dag 中的一个简单 bash 脚本。我正在阅读该任务的日志并注意到默认情况下,该任务会查找 aws_default 连接并尝试使用它但没有找到它。

我转到连接窗格并设置了 aws_default 连接,但它仍然在日志中显示相同的消息。

Airflow Connection: aws_conn_id=aws_default

No credentials retrieved from Connection

*** Reading remote log from Cloudwatch log_group: airflow-mwaa-Task log_stream: dms-

postgres-dialog-label-pg/start-replication-task/2021-11-22T13_00_00+00_00/1.log.
[2021-11-23 13:01:02,487] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,486] {{base_aws.py:368}} INFO - Airflow Connection: aws_conn_id=aws_default
[2021-11-23 13:01:02,657] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,656] {{base_aws.py:179}} INFO - No credentials retrieved from Connection
[2021-11-23 13:01:02,678] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,678] {{base_aws.py:87}} INFO - Creating session with aws_access_key_id=None region_name=us-east-1
[2021-11-23 13:01:02,772] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,772] {{base_aws.py:157}} INFO - role_arn is None

如何让 MWAA 识别此连接?

我的爸爸:

from datetime import datetime, timedelta, tzinfo
import pendulum

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

local_tz = pendulum.timezone("America/New_York")
start_date = datetime(2021, 11, 9, 8, tzinfo=local_tz)
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
with DAG(
    'dms-postgres-dialog-label-pg-test',
    default_args=default_args,
    description='',
    schedule_interval=timedelta(days=1),
    start_date=start_date,
    tags=['example'],
) as dag:

    t1 = BashOperator(
        task_id='start-replication-task',
        bash_command="""
        aws dms start-replication-task --replication-task-arn arn:aws:dms:us-east-1:blah --start-replication-task-type reload-target
        """,
    )

    t1

编辑: 现在,我只是导入一个内置函数并使用它来获取凭据。示例:

from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection('aws_service_account')
...

print(conn.host)
print(conn.login)
print(conn.password)

更新这个,因为我刚刚获得 AWS 支持。

使用 MWAA 创建的执行角色代替 aws_default 中的访问密钥 ID 和秘密。要使用自定义访问密钥 ID 和密码,请按照 @Jonathan Porter 的建议和他的问题答案进行操作:

from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection('aws_service_account')
...

print(conn.host)
print(conn.login)
print(conn.password)

但是,如果想使用mwaa专门提供的执行角色,这是mwaa默认的。令人困惑的是,信息消息指出没有从连接中检索到凭据,但是 执行角色将用于类似于 kubernetes pod 操作员的东西

[2021-11-23 13:01:02,487] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,486] {{base_aws.py:368}} INFO - Airflow Connection: aws_conn_id=aws_default
[2021-11-23 13:01:02,657] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,656] {{base_aws.py:179}} INFO - No credentials retrieved from Connection
[2021-11-23 13:01:02,678] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,678] {{base_aws.py:87}} INFO - Creating session with aws_access_key_id=None region_name=us-east-1
[2021-11-23 13:01:02,772] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,772] {{base_aws.py:157}} INFO - role_arn is None

例如,下面使用 mwaa env 中的执行角色自动设置的 .aws/credentials:

from datetime import timedelta
from airflow import DAG
from datetime import datetime
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

default_args = {
    'owner': 'aws',
    'depends_on_past': False,
    'start_date': datetime(2019, 2, 20),
    'provide_context': True
}

dag = DAG(
    'kubernetes_pod_example', default_args=default_args, schedule_interval=None
)

#use a kube_config stored in s3 dags folder for now
kube_config_path = '/usr/local/airflow/dags/kube_config.yaml'

podRun = KubernetesPodOperator(
    namespace="mwaa",
    image="ubuntu:18.04",
    cmds=["bash"],
    arguments=["-c", "ls"],
    labels={"foo": "bar"},
    name="mwaa-pod-test",
    task_id="pod-task",
    get_logs=True,
    dag=dag,
    is_delete_operator_pod=False,
    config_file=kube_config_path,
    in_cluster=False,
    cluster_context='aws',
    execution_timeout=timedelta(seconds=60)
)

希望这对遇到困难的其他人有所帮助。