SQLAlchemy ORM 转换为 pandas DataFrame
SQLAlchemy ORM conversion to pandas DataFrame
是否有将 SQLAlchemy <Query object>
转换为 pandas DataFrame 的解决方案?
Pandas 可以使用 pandas.read_sql
但这需要使用原始 SQL。我有两个想要避免它的原因:
- 我已经拥有使用 ORM 的一切(这本身就是一个很好的理由)并且
- 我正在使用 python 列表作为查询的一部分,例如:
db.session.query(Item).filter(Item.symbol.in_(add_symbols)
where Item
is my model class and add_symbols
is a list). This is the equivalent of SQL SELECT ... from ... WHERE ... IN
.
有什么可能吗?
以下在大多数情况下应该有效:
df = pd.read_sql(query.statement, query.session.bind)
有关参数的详细信息,请参阅 pandas.read_sql
文档。
如果你想编译一个带有参数和方言特定参数的查询,使用这样的东西:
c = query.statement.compile(query.session.bind)
df = pandas.read_sql(c.string, query.session.bind, params=c.params)
为了让新手pandas程序员更清楚这一点,这里有一个具体的例子,
pd.read_sql(session.query(Complaint).filter(Complaint.id == 2).statement,session.bind)
这里我们 select 来自投诉的投诉 table (sqlalchemy 模型是 Complaint),id = 2
所选的解决方案对我不起作用,因为我一直收到错误
AttributeError: 'AnnotatedSelect' object has no attribute 'lower'
我发现以下方法有效:
df = pd.read_sql_query(query.statement, engine)
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
engine = create_engine('postgresql://postgres:postgres@localhost:5432/DB', echo=False)
Base = declarative_base(bind=engine)
Session = sessionmaker(bind=engine)
session = Session()
conn = session.bind
class DailyTrendsTable(Base):
__tablename__ = 'trends'
__table_args__ = ({"schema": 'mf_analysis'})
company_code = Column(DOUBLE_PRECISION, primary_key=True)
rt_bullish_trending = Column(Integer)
rt_bearish_trending = Column(Integer)
rt_bullish_non_trending = Column(Integer)
rt_bearish_non_trending = Column(Integer)
gen_date = Column(Date, primary_key=True)
df_query = select([DailyTrendsTable])
df_data = pd.read_sql(rt_daily_query, con = conn)
为了完整起见:作为 Pandas-function read_sql_query()
, you can also use the Pandas-DataFrame-function from_records()
的替代方法来转换 structured or record ndarray to DataFrame
.
如果你,这会派上用场。已经在 SQLAlchemy 中执行了查询并且结果已经可用:
import pandas as pd
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session, sessionmaker
SQLALCHEMY_DATABASE_URI = 'postgresql://postgres:postgres@localhost:5432/my_database'
engine = create_engine(SQLALCHEMY_DATABASE_URI, pool_pre_ping=True, echo=False)
db = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))
Base = declarative_base(bind=engine)
class Currency(Base):
"""The `Currency`-table"""
__tablename__ = "currency"
__table_args__ = {"schema": "data"}
id = Column(Integer, primary_key=True, nullable=False)
name = Column(String(64), nullable=False)
# Defining the SQLAlchemy-query
currency_query = db.query(Currency).with_entities(Currency.id, Currency.name)
# Getting all the entries via SQLAlchemy
currencies = currency_query.all()
# We provide also the (alternate) column names and set the index here,
# renaming the column `id` to `currency__id`
df_from_records = pd.DataFrame.from_records(currencies
, index='currency__id'
, columns=['currency__id', 'name'])
print(df_from_records.head(5))
# Or getting the entries via Pandas instead of SQLAlchemy using the
# aforementioned function `read_sql_query()`. We can set the index-columns here as well
df_from_query = pd.read_sql_query(currency_query.statement, db.bind, index_col='id')
# Renaming the index-column(s) from `id` to `currency__id` needs another statement
df_from_query.index.rename(name='currency__id', inplace=True)
print(df_from_query.head(5))
此答案提供了一个使用 SQL Alchemy select
语句并返回 pandas 数据框的可重现示例。它基于内存 SQLite 数据库,因此任何人都可以在不安装数据库引擎的情况下复制它。
import pandas
from sqlalchemy import create_engine
from sqlalchemy import MetaData, Table, Column, Text
from sqlalchemy.orm import Session
定义 table 元数据并创建 table
engine = create_engine('sqlite://')
meta = MetaData()
meta.bind = engine
user_table = Table('user', meta,
Column("name", Text),
Column("full_name", Text))
user_table.create()
Insert部分数据进入user
table
stmt = user_table.insert().values(name='Bob', full_name='Sponge Bob')
with Session(engine) as session:
result = session.execute(stmt)
session.commit()
将 select 语句的结果读入 pandas 数据框
# Select data into a pandas data frame
stmt = user_table.select().where(user_table.c.name == 'Bob')
df = pandas.read_sql_query(stmt, engine)
df
Out:
name full_name
0 Bob Sponge Bob
如果使用SQL查询
def generate_df_from_sqlquery(query):
from pandas import DataFrame
query = db.session.execute(query)
df = DataFrame(query.fetchall())
if len(df) > 0:
df.columns = query.keys()
else:
columns = query.keys()
df = pd.DataFrame(columns=columns)
return df
profile_df = generate_df_from_sqlquery(profile_query)
使用 2.0 SQLalchemy
语法(在 1.4 中也可以使用标志 future=True
)看起来 pd.read_sql
还没有实现,它会引发:
NotImplementedError: This method is not implemented for SQLAlchemy 2.0.
这是一个悬而未决的问题,直到 pandas 2.0 才会解决,您可以找到一些关于此的信息 here and here。
我没有找到任何令人满意的解决方法,但有些人似乎使用了两种引擎配置,一种带有 future False 标志:
engine2 = create_engine(URL_string, echo=False, future=False)
如果您查询字符串,此解决方案还可以,但使用 ORM,我能做的最好的事情是自定义函数尚未优化,但它有效:
Conditions = session.query(ExampleTable)
def df_from_sql(query):
return pd.DataFrame({i:j.__dict__ for i,j in enumerate(query.all())},).T.drop(columns='_sa_instance_state')
df = df_from_sql(ExampleTable)
在 pd.read_sql 实施新语法之前,无论如何,此解决方案都是临时的。
是否有将 SQLAlchemy <Query object>
转换为 pandas DataFrame 的解决方案?
Pandas 可以使用 pandas.read_sql
但这需要使用原始 SQL。我有两个想要避免它的原因:
- 我已经拥有使用 ORM 的一切(这本身就是一个很好的理由)并且
- 我正在使用 python 列表作为查询的一部分,例如:
db.session.query(Item).filter(Item.symbol.in_(add_symbols)
whereItem
is my model class andadd_symbols
is a list). This is the equivalent of SQLSELECT ... from ... WHERE ... IN
.
有什么可能吗?
以下在大多数情况下应该有效:
df = pd.read_sql(query.statement, query.session.bind)
有关参数的详细信息,请参阅 pandas.read_sql
文档。
如果你想编译一个带有参数和方言特定参数的查询,使用这样的东西:
c = query.statement.compile(query.session.bind)
df = pandas.read_sql(c.string, query.session.bind, params=c.params)
为了让新手pandas程序员更清楚这一点,这里有一个具体的例子,
pd.read_sql(session.query(Complaint).filter(Complaint.id == 2).statement,session.bind)
这里我们 select 来自投诉的投诉 table (sqlalchemy 模型是 Complaint),id = 2
所选的解决方案对我不起作用,因为我一直收到错误
AttributeError: 'AnnotatedSelect' object has no attribute 'lower'
我发现以下方法有效:
df = pd.read_sql_query(query.statement, engine)
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
engine = create_engine('postgresql://postgres:postgres@localhost:5432/DB', echo=False)
Base = declarative_base(bind=engine)
Session = sessionmaker(bind=engine)
session = Session()
conn = session.bind
class DailyTrendsTable(Base):
__tablename__ = 'trends'
__table_args__ = ({"schema": 'mf_analysis'})
company_code = Column(DOUBLE_PRECISION, primary_key=True)
rt_bullish_trending = Column(Integer)
rt_bearish_trending = Column(Integer)
rt_bullish_non_trending = Column(Integer)
rt_bearish_non_trending = Column(Integer)
gen_date = Column(Date, primary_key=True)
df_query = select([DailyTrendsTable])
df_data = pd.read_sql(rt_daily_query, con = conn)
为了完整起见:作为 Pandas-function read_sql_query()
, you can also use the Pandas-DataFrame-function from_records()
的替代方法来转换 structured or record ndarray to DataFrame
.
如果你,这会派上用场。已经在 SQLAlchemy 中执行了查询并且结果已经可用:
import pandas as pd
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session, sessionmaker
SQLALCHEMY_DATABASE_URI = 'postgresql://postgres:postgres@localhost:5432/my_database'
engine = create_engine(SQLALCHEMY_DATABASE_URI, pool_pre_ping=True, echo=False)
db = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))
Base = declarative_base(bind=engine)
class Currency(Base):
"""The `Currency`-table"""
__tablename__ = "currency"
__table_args__ = {"schema": "data"}
id = Column(Integer, primary_key=True, nullable=False)
name = Column(String(64), nullable=False)
# Defining the SQLAlchemy-query
currency_query = db.query(Currency).with_entities(Currency.id, Currency.name)
# Getting all the entries via SQLAlchemy
currencies = currency_query.all()
# We provide also the (alternate) column names and set the index here,
# renaming the column `id` to `currency__id`
df_from_records = pd.DataFrame.from_records(currencies
, index='currency__id'
, columns=['currency__id', 'name'])
print(df_from_records.head(5))
# Or getting the entries via Pandas instead of SQLAlchemy using the
# aforementioned function `read_sql_query()`. We can set the index-columns here as well
df_from_query = pd.read_sql_query(currency_query.statement, db.bind, index_col='id')
# Renaming the index-column(s) from `id` to `currency__id` needs another statement
df_from_query.index.rename(name='currency__id', inplace=True)
print(df_from_query.head(5))
此答案提供了一个使用 SQL Alchemy select
语句并返回 pandas 数据框的可重现示例。它基于内存 SQLite 数据库,因此任何人都可以在不安装数据库引擎的情况下复制它。
import pandas
from sqlalchemy import create_engine
from sqlalchemy import MetaData, Table, Column, Text
from sqlalchemy.orm import Session
定义 table 元数据并创建 table
engine = create_engine('sqlite://')
meta = MetaData()
meta.bind = engine
user_table = Table('user', meta,
Column("name", Text),
Column("full_name", Text))
user_table.create()
Insert部分数据进入user
table
stmt = user_table.insert().values(name='Bob', full_name='Sponge Bob')
with Session(engine) as session:
result = session.execute(stmt)
session.commit()
将 select 语句的结果读入 pandas 数据框
# Select data into a pandas data frame
stmt = user_table.select().where(user_table.c.name == 'Bob')
df = pandas.read_sql_query(stmt, engine)
df
Out:
name full_name
0 Bob Sponge Bob
如果使用SQL查询
def generate_df_from_sqlquery(query):
from pandas import DataFrame
query = db.session.execute(query)
df = DataFrame(query.fetchall())
if len(df) > 0:
df.columns = query.keys()
else:
columns = query.keys()
df = pd.DataFrame(columns=columns)
return df
profile_df = generate_df_from_sqlquery(profile_query)
使用 2.0 SQLalchemy
语法(在 1.4 中也可以使用标志 future=True
)看起来 pd.read_sql
还没有实现,它会引发:
NotImplementedError: This method is not implemented for SQLAlchemy 2.0.
这是一个悬而未决的问题,直到 pandas 2.0 才会解决,您可以找到一些关于此的信息 here and here。
我没有找到任何令人满意的解决方法,但有些人似乎使用了两种引擎配置,一种带有 future False 标志:
engine2 = create_engine(URL_string, echo=False, future=False)
如果您查询字符串,此解决方案还可以,但使用 ORM,我能做的最好的事情是自定义函数尚未优化,但它有效:
Conditions = session.query(ExampleTable)
def df_from_sql(query):
return pd.DataFrame({i:j.__dict__ for i,j in enumerate(query.all())},).T.drop(columns='_sa_instance_state')
df = df_from_sql(ExampleTable)
在 pd.read_sql 实施新语法之前,无论如何,此解决方案都是临时的。