以编程方式更新 Airflow 中的连接时出现问题

Problem updating the connections in Airflow programatically

我正在尝试使用 python 更新 Airflow 连接。我创建了一个 python 函数,它从 API 获取身份验证令牌并更新 Airflow 中的额外连接字段。

我正在获取 json 格式的令牌,如下所示:

{
   "token" : token_value
}

下面是我正在使用的 python 代码的一部分

def set_token():
    # Get token from API & update the Airflow Variables
    Variable.set("token", str(auth_token))
    new_token = Variables.get("token")
    get_conn = Connection(conn_id="test_conn")
    auth_token = { "header" : new_token}
    get_conn.set_extra(str(auth_token))

但是当我 运行 任务时,气流连接中的额外字段没有得到更新。我可以看到我的变量正在更新但没有连接。谁能告诉我我缺少什么?

我怀疑您是否以正确的方式从 Airflow 的元数据库中获取连接。

  • 除此之外,如果您通过 Variable.get() method, shouldn't Connection be receiving the same treatment (although Connection class 获取 Variable 没有 get() 函数,则必须有解决方法)?
  • 这里您只是用给定的 conn_id 参数实例化 Connection 对象(并没有真正从数据库中获取 conn_id 的连接)

每当我必须利用底层 SQLAlchemy 模型时,我都会查看 cli.py。从 connections() 函数中获取线索,这是我认为应该起作用的

from airflow.models import Connection
from airflow.settings import Session
from airflow.utils.db import provide_session
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import exc

@provide_session
def update_conn_extra(conn_id: str, new_extra: Any, session: Optional[Session] = None) -> Optional[Connection]:
    try:
        my_conn: Optional[Connection] = (session
                                         .query(Connection)
                                         .filter(Connection.conn_id == conn_id)
                                         .one())
    except exc.NoResultFound:
        my_conn: Optional[Connection] = None
    except exc.MultipleResultsFound:
        my_conn: Optional[Connection] = None
    if my_conn:
        my_conn.extra: Any = new_extra
        session.add(my_conn)
        session.commit()

请注意,这里我们只是简单地用更新的字段覆盖了连接(没有先删除现有的字段),我发现这是可行的。如果您遇到一些问题,您可以在使用 session.delete(my_conn)

写入更新的连接之前删除现有连接