以编程方式更新 Airflow 中的连接时出现问题
Problem updating the connections in Airflow programatically
我正在尝试使用 python 更新 Airflow 连接。我创建了一个 python 函数,它从 API 获取身份验证令牌并更新 Airflow 中的额外连接字段。
我正在获取 json 格式的令牌,如下所示:
{
"token" : token_value
}
下面是我正在使用的 python 代码的一部分
def set_token():
# Get token from API & update the Airflow Variables
Variable.set("token", str(auth_token))
new_token = Variables.get("token")
get_conn = Connection(conn_id="test_conn")
auth_token = { "header" : new_token}
get_conn.set_extra(str(auth_token))
但是当我 运行 任务时,气流连接中的额外字段没有得到更新。我可以看到我的变量正在更新但没有连接。谁能告诉我我缺少什么?
我怀疑您是否以正确的方式从 Airflow 的元数据库中获取连接。
- 除此之外,如果您通过
Variable.get()
method, shouldn't Connection
be receiving the same treatment (although Connection
class 获取 Variable
没有 get()
函数,则必须有解决方法)?
- 这里您只是用给定的
conn_id
参数实例化 Connection
对象(并没有真正从数据库中获取 conn_id
的连接)
每当我必须利用底层 SQLAlchemy
模型时,我都会查看 cli.py
。从 connections()
函数中获取线索,这是我认为应该起作用的
from airflow.models import Connection
from airflow.settings import Session
from airflow.utils.db import provide_session
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import exc
@provide_session
def update_conn_extra(conn_id: str, new_extra: Any, session: Optional[Session] = None) -> Optional[Connection]:
try:
my_conn: Optional[Connection] = (session
.query(Connection)
.filter(Connection.conn_id == conn_id)
.one())
except exc.NoResultFound:
my_conn: Optional[Connection] = None
except exc.MultipleResultsFound:
my_conn: Optional[Connection] = None
if my_conn:
my_conn.extra: Any = new_extra
session.add(my_conn)
session.commit()
请注意,这里我们只是简单地用更新的字段覆盖了连接(没有先删除现有的字段),我发现这是可行的。如果您遇到一些问题,您可以在使用 session.delete(my_conn)
写入更新的连接之前删除现有连接
我正在尝试使用 python 更新 Airflow 连接。我创建了一个 python 函数,它从 API 获取身份验证令牌并更新 Airflow 中的额外连接字段。
我正在获取 json 格式的令牌,如下所示:
{
"token" : token_value
}
下面是我正在使用的 python 代码的一部分
def set_token():
# Get token from API & update the Airflow Variables
Variable.set("token", str(auth_token))
new_token = Variables.get("token")
get_conn = Connection(conn_id="test_conn")
auth_token = { "header" : new_token}
get_conn.set_extra(str(auth_token))
但是当我 运行 任务时,气流连接中的额外字段没有得到更新。我可以看到我的变量正在更新但没有连接。谁能告诉我我缺少什么?
我怀疑您是否以正确的方式从 Airflow 的元数据库中获取连接。
- 除此之外,如果您通过
Variable.get()
method, shouldn'tConnection
be receiving the same treatment (althoughConnection
class 获取Variable
没有get()
函数,则必须有解决方法)? - 这里您只是用给定的
conn_id
参数实例化Connection
对象(并没有真正从数据库中获取conn_id
的连接)
每当我必须利用底层 SQLAlchemy
模型时,我都会查看 cli.py
。从 connections()
函数中获取线索,这是我认为应该起作用的
from airflow.models import Connection
from airflow.settings import Session
from airflow.utils.db import provide_session
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import exc
@provide_session
def update_conn_extra(conn_id: str, new_extra: Any, session: Optional[Session] = None) -> Optional[Connection]:
try:
my_conn: Optional[Connection] = (session
.query(Connection)
.filter(Connection.conn_id == conn_id)
.one())
except exc.NoResultFound:
my_conn: Optional[Connection] = None
except exc.MultipleResultsFound:
my_conn: Optional[Connection] = None
if my_conn:
my_conn.extra: Any = new_extra
session.add(my_conn)
session.commit()
请注意,这里我们只是简单地用更新的字段覆盖了连接(没有先删除现有的字段),我发现这是可行的。如果您遇到一些问题,您可以在使用 session.delete(my_conn)