Airflow + pandas read_sql_query() 提交
Airflow + pandas read_sql_query() with commit
问题
我可以使用 read_sql() 将 SQL 事务提交到数据库吗?
用例和背景
我有一个用例,我希望允许用户执行一些预定义的 SQL 并返回一个 pandas 数据帧。在某些情况下,此 SQL 将需要查询预填充的 table,而在其他情况下,此 SQL 将执行一个函数,该函数将写入 table 和那么 table 将被查询。
此逻辑当前包含在 Airflow DAG 的方法内部,以便使用 PostgresHook 来利用 Airflow 可访问的数据库连接信息——该方法最终在 PythonOperator 任务中调用。我通过测试了解到 PostgresHook 创建了一个 psycopg2 连接对象。
代码
from airflow.hooks.postgres_hook import PostgresHook
import pandas as pd
def create_df(job_id,other_unrelated_inputs):
conn = job_type_to_connection(job_type) # method that helps choose a database
sql = open('/sql_files/job_id_{}.sql'.format(job_id)) #chooses arbitrary SQL
sql_template = sql.read()
hook = PostgresHook(postgres_conn_id=conn) #connection information for alias is predefined elsewhere within Airflow
try:
hook_conn_obj = hook.get_conn()
print(type(hook_conn_obj)) # <class 'psycopg2.extensions.connection'>
# Runs SQL template with variables, but does not commit. Alternatively, have used hook.get_pandas_df(sql_template)
df = pd.io.sql.read_sql(sql_template, con = hook_conn_obj)
except:
#catches some errors#
return df
问题
目前,在执行 SQL 函数时,此代码会生成一个数据帧,但不会提交在 SQL 函数中所做的任何数据库更改。例如,更准确地说,如果 SQL 函数将一行插入 table,该事务将不会提交,并且该行不会出现在 table.[=16= 中]
尝试次数
我尝试了一些修复,但还是卡住了。我最近的努力是更改 read_sql 用来自动提交事务的 psycopg2 连接的自动提交属性。
我承认我无法弄清楚连接的属性何时会对 SQL 的执行产生影响。
我认识到另一种方法是复制 PostgresHook.run() 中的一些逻辑来提交,然后添加一些代码将结果推送到数据帧中,但它似乎更简洁,也更容易供未来支持人员使用如果可能,已经创建的方法。
我能找到的最类似的 SO 问题是 ,但我对独立于 Airflow 的解决方案很感兴趣。
编辑
...
try:
hook_conn_obj = hook.get_conn()
print(type(hook_conn_obj)) # <class 'psycopg2.extensions.connection'>
hook_conn_obj.autocommit = True
df = pd.io.sql.read_sql(sql_template, con = hook_conn_obj) # Runs SQL template with variables, but does not commit
except:
#catches some errors#
return df
这似乎有效。如果有人对实现此目标的更好方法有任何评论或想法,我仍然有兴趣从讨论中学习。
谢谢!
read_sql
不会提交,因为正如该方法名称所暗示的那样,目标是读取数据,而不是写入。这是 pandas
的不错设计选择。这很重要,因为它可以防止意外写入并允许有趣的场景,例如 运行 一个过程,读取它的效果但没有任何内容被持久化。 read_sql
的目的是阅读,而不是写作。直接表达意图是黄金标准原则。
表达您的意图的更明确的方式是在 fetchall
之前明确地 execute
(提交)。但是因为 pandas
没有提供从 cursor
对象中读取的简单方法,你会失去 read_sql
提供的轻松心态,并且必须自己创建 DataFrame。
所以总的来说,你的解决方案是好的,通过设置 autocommit=True
你表明你的数据库交互将持续存在,无论他们做什么,所以应该不会发生意外。读起来有点奇怪,但如果你将 sql_template
变量命名为 write_then_read_sql
或在文档字符串中进行解释,意图会更清楚。
我有一个类似的用例——使用 Pandas 将数据加载到 SQL 服务器,调用执行繁重工作并写入表的存储过程,然后将结果集捕获到新的数据框。
我通过使用上下文管理器并显式提交事务解决了这个问题:
# Connect to SQL Server
engine = sqlalchemy.create_engine('db_string')
with engine.connect() as connection:
# Write dataframe to table with replace
df.to_sql(name='myTable', con=connection, if_exists='replace')
with connection.begin() as transaction:
# Execute verification routine and capture results
df_processed = pandas.read_sql(sql='exec sproc', con=connection)
transaction.commit()
问题
我可以使用 read_sql() 将 SQL 事务提交到数据库吗?
用例和背景
我有一个用例,我希望允许用户执行一些预定义的 SQL 并返回一个 pandas 数据帧。在某些情况下,此 SQL 将需要查询预填充的 table,而在其他情况下,此 SQL 将执行一个函数,该函数将写入 table 和那么 table 将被查询。 此逻辑当前包含在 Airflow DAG 的方法内部,以便使用 PostgresHook 来利用 Airflow 可访问的数据库连接信息——该方法最终在 PythonOperator 任务中调用。我通过测试了解到 PostgresHook 创建了一个 psycopg2 连接对象。
代码
from airflow.hooks.postgres_hook import PostgresHook
import pandas as pd
def create_df(job_id,other_unrelated_inputs):
conn = job_type_to_connection(job_type) # method that helps choose a database
sql = open('/sql_files/job_id_{}.sql'.format(job_id)) #chooses arbitrary SQL
sql_template = sql.read()
hook = PostgresHook(postgres_conn_id=conn) #connection information for alias is predefined elsewhere within Airflow
try:
hook_conn_obj = hook.get_conn()
print(type(hook_conn_obj)) # <class 'psycopg2.extensions.connection'>
# Runs SQL template with variables, but does not commit. Alternatively, have used hook.get_pandas_df(sql_template)
df = pd.io.sql.read_sql(sql_template, con = hook_conn_obj)
except:
#catches some errors#
return df
问题
目前,在执行 SQL 函数时,此代码会生成一个数据帧,但不会提交在 SQL 函数中所做的任何数据库更改。例如,更准确地说,如果 SQL 函数将一行插入 table,该事务将不会提交,并且该行不会出现在 table.[=16= 中]
尝试次数
我尝试了一些修复,但还是卡住了。我最近的努力是更改 read_sql 用来自动提交事务的 psycopg2 连接的自动提交属性。
我承认我无法弄清楚连接的属性何时会对 SQL 的执行产生影响。
我认识到另一种方法是复制 PostgresHook.run() 中的一些逻辑来提交,然后添加一些代码将结果推送到数据帧中,但它似乎更简洁,也更容易供未来支持人员使用如果可能,已经创建的方法。
我能找到的最类似的 SO 问题是
编辑
...
try:
hook_conn_obj = hook.get_conn()
print(type(hook_conn_obj)) # <class 'psycopg2.extensions.connection'>
hook_conn_obj.autocommit = True
df = pd.io.sql.read_sql(sql_template, con = hook_conn_obj) # Runs SQL template with variables, but does not commit
except:
#catches some errors#
return df
这似乎有效。如果有人对实现此目标的更好方法有任何评论或想法,我仍然有兴趣从讨论中学习。
谢谢!
read_sql
不会提交,因为正如该方法名称所暗示的那样,目标是读取数据,而不是写入。这是 pandas
的不错设计选择。这很重要,因为它可以防止意外写入并允许有趣的场景,例如 运行 一个过程,读取它的效果但没有任何内容被持久化。 read_sql
的目的是阅读,而不是写作。直接表达意图是黄金标准原则。
表达您的意图的更明确的方式是在 fetchall
之前明确地 execute
(提交)。但是因为 pandas
没有提供从 cursor
对象中读取的简单方法,你会失去 read_sql
提供的轻松心态,并且必须自己创建 DataFrame。
所以总的来说,你的解决方案是好的,通过设置 autocommit=True
你表明你的数据库交互将持续存在,无论他们做什么,所以应该不会发生意外。读起来有点奇怪,但如果你将 sql_template
变量命名为 write_then_read_sql
或在文档字符串中进行解释,意图会更清楚。
我有一个类似的用例——使用 Pandas 将数据加载到 SQL 服务器,调用执行繁重工作并写入表的存储过程,然后将结果集捕获到新的数据框。
我通过使用上下文管理器并显式提交事务解决了这个问题:
# Connect to SQL Server
engine = sqlalchemy.create_engine('db_string')
with engine.connect() as connection:
# Write dataframe to table with replace
df.to_sql(name='myTable', con=connection, if_exists='replace')
with connection.begin() as transaction:
# Execute verification routine and capture results
df_processed = pandas.read_sql(sql='exec sproc', con=connection)
transaction.commit()