如何在没有 运行 连接中断的情况下在 Airflow 中将 DatabaseHook 对象与 PythonOperator 一起使用?
How to use DatabaseHook objects with PythonOperator in Airflow without running out of connections?
我正在尝试使用 Airflow Connections 存储我的数据库凭据并将它们与 PythonOperators 一起使用。我注意到,如果我将凭据传递给 PythonOperator,那么每个变量都会被记录下来,包括数据库密码。因此,根据下面的示例,我开始将连接对象本身传递给 PythonOperator。
但我现在遇到的问题是气流会产生大量这样的对象,即使这个 dag 只计划每天运行,导致经常出现达到连接限制的问题。 如何在不使用 Airflow 中数据脚本的大量连接的情况下将 PostgresHook 与 PythonOperator 一起使用?
import sys
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook
try:
sys.path.append('/path/to/my/awesome/module/')
from awesome_module import function_1, function_1
except:
raise ImportError("Couldn't import awesome_module")
postgres_hook_object = PostgresHook("dedicated_bot_account")
with postgres_hook_object.get_conn() as con:
t1 = PythonOperator(
task_id = 'function_1',
python_callable = function_1,
dag = dag,
op_kwargs = {'conn':con}
)
t2 = PythonOperator(
task_id = 'function_2',
python_callable = function_2,
dag = dag,
op_args = [con, service]
)
我从 Airflow Slack 了解到 DAG 中的代码是 运行 调度程序的频率,因此每次调度程序刷新 DAG 时都会打开多个连接。
似乎最佳做法是确保仅在任务 运行 时间通过以下任一方式打开连接:
- 如果任务是在 DAG 中定义的,将连接打开代码移动到 Python 函数定义中
- 如果任务是在别处定义的,则在任务中打开连接。 请注意,如果通过明文作为变量传递连接信息,那么这将被记录
我正在尝试使用 Airflow Connections 存储我的数据库凭据并将它们与 PythonOperators 一起使用。我注意到,如果我将凭据传递给 PythonOperator,那么每个变量都会被记录下来,包括数据库密码。因此,根据下面的示例,我开始将连接对象本身传递给 PythonOperator。
但我现在遇到的问题是气流会产生大量这样的对象,即使这个 dag 只计划每天运行,导致经常出现达到连接限制的问题。 如何在不使用 Airflow 中数据脚本的大量连接的情况下将 PostgresHook 与 PythonOperator 一起使用?
import sys
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook
try:
sys.path.append('/path/to/my/awesome/module/')
from awesome_module import function_1, function_1
except:
raise ImportError("Couldn't import awesome_module")
postgres_hook_object = PostgresHook("dedicated_bot_account")
with postgres_hook_object.get_conn() as con:
t1 = PythonOperator(
task_id = 'function_1',
python_callable = function_1,
dag = dag,
op_kwargs = {'conn':con}
)
t2 = PythonOperator(
task_id = 'function_2',
python_callable = function_2,
dag = dag,
op_args = [con, service]
)
我从 Airflow Slack 了解到 DAG 中的代码是 运行 调度程序的频率,因此每次调度程序刷新 DAG 时都会打开多个连接。
似乎最佳做法是确保仅在任务 运行 时间通过以下任一方式打开连接:
- 如果任务是在 DAG 中定义的,将连接打开代码移动到 Python 函数定义中
- 如果任务是在别处定义的,则在任务中打开连接。 请注意,如果通过明文作为变量传递连接信息,那么这将被记录