将 temp table 与 SQLAlchemy 一起使用
Use temp table with SQLAlchemy
我正在尝试将临时 table 与 SQLAlchemy 结合使用,并将其加入现有的 table。这是我目前所拥有的
engine = db.get_engine(db.app, 'MY_DATABASE')
df = pd.DataFrame({"id": [1, 2, 3], "value": [100, 200, 300], "date": [date.today(), date.today(), date.today()]})
temp_table = db.Table('#temp_table',
db.Column('id', db.Integer),
db.Column('value', db.Integer),
db.Column('date', db.DateTime))
temp_table.create(engine)
df.to_sql(name='tempdb.dbo.#temp_table',
con=engine,
if_exists='append',
index=False)
query = db.session.query(ExistingTable.id).join(temp_table, temp_table.c.id == ExistingTable.id)
out_df = pd.read_sql(query.statement, engine)
temp_table.drop(engine)
return out_df.to_dict('records')
这不会 return 任何结果,因为 to_sql
没有得到 运行 的插入语句(我认为这是因为它们 运行 使用sp_prepexec
,但我对此并不完全确定)。
然后我尝试只写出 SQL 语句(CREATE TABLE #temp_table...
、INSERT INTO #temp_table...
、SELECT [id] FROM...
)然后 运行ning pd.read_sql(query, engine)
.我收到错误消息
This result object does not return rows. It has been closed automatically.
我猜这是因为该语句的作用不仅仅是 SELECT
?
我该如何解决这个问题(两种解决方案都可以,尽管第一种方案更好,因为它避免了硬编码 SQL)。需要明确的是,我无法修改现有数据库中的架构——它是一个供应商数据库。
您可以尝试使用另一种解决方案 - Process-Keyed Table
A process-keyed table is simply a permanent table that serves as a
temp table. To permit processes to use the table simultaneously, the
table has an extra column to identify the process. The simplest way to
do this is the global variable @@spid (@@spid is the process id in SQL
Server).
...
One alternative for the process-key is to use a GUID (data type
uniqueidentifier).
如果要在临时 table 中插入的记录数是 small/moderate,一种可能性是使用 literal subquery
或 values CTE
而不是创建临时 table.
# MODEL
class ExistingTable(Base):
__tablename__ = 'existing_table'
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.String)
# ...
假设还要将以下数据插入 temp
table:
# This data retrieved from another database and used for filtering
rows = [
(1, 100, datetime.date(2017, 1, 1)),
(3, 300, datetime.date(2017, 3, 1)),
(5, 500, datetime.date(2017, 5, 1)),
]
创建包含该数据的 CTE 或子查询:
stmts = [
# @NOTE: optimization to reduce the size of the statement:
# make type cast only for first row, for other rows DB engine will infer
sa.select([
sa.cast(sa.literal(i), sa.Integer).label("id"),
sa.cast(sa.literal(v), sa.Integer).label("value"),
sa.cast(sa.literal(d), sa.DateTime).label("date"),
]) if idx == 0 else
sa.select([sa.literal(i), sa.literal(v), sa.literal(d)]) # no type cast
for idx, (i, v, d) in enumerate(rows)
]
subquery = sa.union_all(*stmts)
# Choose one option below.
# I personally prefer B because one could reuse the CTE multiple times in the same query
# subquery = subquery.alias("temp_table") # option A
subquery = subquery.cte(name="temp_table") # option B
使用所需的连接和过滤器创建最终查询:
query = (
session
.query(ExistingTable.id)
.join(subquery, subquery.c.id == ExistingTable.id)
# .filter(subquery.c.date >= XXX_DATE)
)
# TEMP: Test result output
for res in query:
print(res)
最后得到pandas数据框:
out_df = pd.read_sql(query.statement, engine)
result = out_df.to_dict('records')
我正在尝试将临时 table 与 SQLAlchemy 结合使用,并将其加入现有的 table。这是我目前所拥有的
engine = db.get_engine(db.app, 'MY_DATABASE')
df = pd.DataFrame({"id": [1, 2, 3], "value": [100, 200, 300], "date": [date.today(), date.today(), date.today()]})
temp_table = db.Table('#temp_table',
db.Column('id', db.Integer),
db.Column('value', db.Integer),
db.Column('date', db.DateTime))
temp_table.create(engine)
df.to_sql(name='tempdb.dbo.#temp_table',
con=engine,
if_exists='append',
index=False)
query = db.session.query(ExistingTable.id).join(temp_table, temp_table.c.id == ExistingTable.id)
out_df = pd.read_sql(query.statement, engine)
temp_table.drop(engine)
return out_df.to_dict('records')
这不会 return 任何结果,因为 to_sql
没有得到 运行 的插入语句(我认为这是因为它们 运行 使用sp_prepexec
,但我对此并不完全确定)。
然后我尝试只写出 SQL 语句(CREATE TABLE #temp_table...
、INSERT INTO #temp_table...
、SELECT [id] FROM...
)然后 运行ning pd.read_sql(query, engine)
.我收到错误消息
This result object does not return rows. It has been closed automatically.
我猜这是因为该语句的作用不仅仅是 SELECT
?
我该如何解决这个问题(两种解决方案都可以,尽管第一种方案更好,因为它避免了硬编码 SQL)。需要明确的是,我无法修改现有数据库中的架构——它是一个供应商数据库。
您可以尝试使用另一种解决方案 - Process-Keyed Table
A process-keyed table is simply a permanent table that serves as a temp table. To permit processes to use the table simultaneously, the table has an extra column to identify the process. The simplest way to do this is the global variable @@spid (@@spid is the process id in SQL Server).
...
One alternative for the process-key is to use a GUID (data type uniqueidentifier).
如果要在临时 table 中插入的记录数是 small/moderate,一种可能性是使用 literal subquery
或 values CTE
而不是创建临时 table.
# MODEL
class ExistingTable(Base):
__tablename__ = 'existing_table'
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.String)
# ...
假设还要将以下数据插入 temp
table:
# This data retrieved from another database and used for filtering
rows = [
(1, 100, datetime.date(2017, 1, 1)),
(3, 300, datetime.date(2017, 3, 1)),
(5, 500, datetime.date(2017, 5, 1)),
]
创建包含该数据的 CTE 或子查询:
stmts = [
# @NOTE: optimization to reduce the size of the statement:
# make type cast only for first row, for other rows DB engine will infer
sa.select([
sa.cast(sa.literal(i), sa.Integer).label("id"),
sa.cast(sa.literal(v), sa.Integer).label("value"),
sa.cast(sa.literal(d), sa.DateTime).label("date"),
]) if idx == 0 else
sa.select([sa.literal(i), sa.literal(v), sa.literal(d)]) # no type cast
for idx, (i, v, d) in enumerate(rows)
]
subquery = sa.union_all(*stmts)
# Choose one option below.
# I personally prefer B because one could reuse the CTE multiple times in the same query
# subquery = subquery.alias("temp_table") # option A
subquery = subquery.cte(name="temp_table") # option B
使用所需的连接和过滤器创建最终查询:
query = (
session
.query(ExistingTable.id)
.join(subquery, subquery.c.id == ExistingTable.id)
# .filter(subquery.c.date >= XXX_DATE)
)
# TEMP: Test result output
for res in query:
print(res)
最后得到pandas数据框:
out_df = pd.read_sql(query.statement, engine)
result = out_df.to_dict('records')