SQLAlchemy bulk_insert_mappings(): 无法获取 table 'test' 的映射器
SQLAlchemy bulk_insert_mappings(): Could not get mapper for table 'test'
我一直在尝试使用 sqlalchemy 的 bulk_insert_mappings。我知道我可以创建会话并连接到数据库。我已经初始化了我的引擎,但我似乎无法从 table 中获得我需要的映射器。
from sqlalchemy import create_engine
from sqlalchemy.orm.session import sessionmaker,Session
from sqlalchemy_utils import get_mapper
engine = create_engine('mysql+pymysql://{}:{}@IP:PORT/'.format(USER,PW)) # removed my config here
connection = engine.connect()
m = MetaData(bind=engine,schema='test')
m.reflect()
Session = sessionmaker(bind=engine)
s = Session()
s.bulk_insert_mappings(get_mapper(m.tables['test.test']), pd.DataFrame({'a':['a','b','c']}).to_dict(orient="records"))
s.commit()
s.close()
我最近在SO上发现了一堆相关问题
SQLAlchemy get Mapper object from Table object (from Metadata or Session or otherwise)
但 sqlalchemy_utils.get_mapper 加注:
"ValueError: Could not get mapper for table 'test'."
sqlalchemy.orm.mapperlib._mapper_registry
似乎是空的。也许是因为我没有将它绑定到我的引擎。但不确定该怎么做。
PS: 测试是一个非常简单的单列table,类型为TEXT
这是 m.tables['test.test']
的输出
Table('test', MetaData(bind=Engine(mysql+pymysql://USER:***@IP:PORT/)), Column('a', TEXT(), table=<test>), schema='test')
我一直在谷歌搜索完全相同的问题。但是,我找到了解决此问题的方法。
class Helper():
pass
new_mapper = sqlalchemy.orm.mapper(Helper, local_table = m.tables['test.test'])
session.bulk_insert_mappings(new_mapper,
df.to_dict(orient="records"), return_defaults = False)
session.commit()
session.close()
根据 the following link,我认为 df.to_sql 在将大量数据帧插入 sql 表时表现非常差。然而,事实证明 bulk_insert_mappings 慢得多。
希望对你有帮助。
SQLAlchemy Mapper
的工作是:
Define the correlation of class attributes to database table columns.
... 它是 SQLAlchemy ORM 的基础。对于 ORM,Python classes 表示数据库中的 tables,并且需要某种机制将 class 上的属性与 [=93] 中的列相关联=].如果您不使用 ORM,您的 tables 不会映射到 Python classes,因此没有使用映射器。这就是为什么您从 get_mapper()
.
得到错误的原因
在你的例子中:
m = MetaData(bind=engine,schema='test')
m.reflect()
MetaData
是:
A collection of Table
objects and their associated schema constructs.
Automatically creates Table
entries in this MetaData
for any table available in the database but not yet present in the MetaData
.
所以此时,您有一个 Table
个对象的集合,您想要对其中一个对象执行批量插入。不要混淆 Table
对象与 ORM 映射 classes,它们不是一回事。
bulk_insert_mappings
状态下的文档:
Perform a bulk insert of the given list of mapping dictionaries.
和
The values within the dictionaries as given are typically passed without modification into Core Insert() constructs
您正在尝试实现数据的批量插入,我们可以跳过 ORM 方法(任何涉及 Session
的方法)并与核心显式交互。
表达式 pd.DataFrame({'a':['a','b','c']}).to_dict(orient="records")
returns dict
的列表,例如:[{'a': 'a'}, {'a': 'b'}, {'a': 'c'}]
,因此为了简单起见,我将使用此处的示例输出。
您的元数据对象中有 table,您已经使用 m.tables['test.test']
检索了该对象,并且该 Table
对象可用于生成其自己的插入语句:
print(m.tables['test.test'].insert())
# INSERT INTO test.test (a) VALUES (%(a)s)
要执行多个语句,我们可以将字典列表传递给 Connection.execute()
,如下所示。
ORM Session
的一个好处是它允许显式事务管理,您可以在必要时调用 Session.rollback()
或 Session.commit()
。连接对象也可以在类似于 Session
使用 Engine.begin()
.
的事务中显式操作
例如,使用上下文管理器:
with engine.begin() as conn:
conn.execute(
m.tables['test.test'].insert(),
*[{'a': 'a'}, {'a': 'b'}, {'a': 'c'}]
)
如果上下文中没有错误,这将自动提交查询,如果有错误则回滚。
引擎日志显示此表达式发出以下查询:
INSERT INTO test.test (a) VALUES (%(a)s)
({'a': 'a'}, {'a': 'b'}, {'a': 'c'})
以下人为设计的示例显示了您使用 Session.bulk_insert_mappings()
进行的原始查询。我必须创建一个 ORM 模型来表示 table 并向 table 添加一个 id
字段,因为 ORM 不喜欢在没有主键的情况下工作。
m = MetaData(bind=engine,schema='test')
Base = declarative_base(metadata=m)
class Test(Base):
__tablename__ = 'test'
id = Column(Integer, primary_key=True)
a = Column(Text)
Session = sessionmaker(bind=engine)
s = Session()
s.bulk_insert_mappings(get_mapper(m.tables['test.test']), pd.DataFrame({'a':['a','b','c']}).to_dict(orient="records"))
s.commit()
s.close()
这是引擎日志中执行的查询:
INSERT INTO test.test (a) VALUES (%(a)s)
({'a': 'a'}, {'a': 'b'}, {'a': 'c'})
您会注意到,这与我们通过直接使用核心实现的查询完全相同。
我一直在尝试使用 sqlalchemy 的 bulk_insert_mappings。我知道我可以创建会话并连接到数据库。我已经初始化了我的引擎,但我似乎无法从 table 中获得我需要的映射器。
from sqlalchemy import create_engine
from sqlalchemy.orm.session import sessionmaker,Session
from sqlalchemy_utils import get_mapper
engine = create_engine('mysql+pymysql://{}:{}@IP:PORT/'.format(USER,PW)) # removed my config here
connection = engine.connect()
m = MetaData(bind=engine,schema='test')
m.reflect()
Session = sessionmaker(bind=engine)
s = Session()
s.bulk_insert_mappings(get_mapper(m.tables['test.test']), pd.DataFrame({'a':['a','b','c']}).to_dict(orient="records"))
s.commit()
s.close()
我最近在SO上发现了一堆相关问题
SQLAlchemy get Mapper object from Table object (from Metadata or Session or otherwise)
但 sqlalchemy_utils.get_mapper 加注:
"ValueError: Could not get mapper for table 'test'."
sqlalchemy.orm.mapperlib._mapper_registry
似乎是空的。也许是因为我没有将它绑定到我的引擎。但不确定该怎么做。
PS: 测试是一个非常简单的单列table,类型为TEXT
这是 m.tables['test.test']
的输出Table('test', MetaData(bind=Engine(mysql+pymysql://USER:***@IP:PORT/)), Column('a', TEXT(), table=<test>), schema='test')
我一直在谷歌搜索完全相同的问题。但是,我找到了解决此问题的方法。
class Helper():
pass
new_mapper = sqlalchemy.orm.mapper(Helper, local_table = m.tables['test.test'])
session.bulk_insert_mappings(new_mapper,
df.to_dict(orient="records"), return_defaults = False)
session.commit()
session.close()
根据 the following link,我认为 df.to_sql 在将大量数据帧插入 sql 表时表现非常差。然而,事实证明 bulk_insert_mappings 慢得多。 希望对你有帮助。
SQLAlchemy Mapper
的工作是:
Define the correlation of class attributes to database table columns.
... 它是 SQLAlchemy ORM 的基础。对于 ORM,Python classes 表示数据库中的 tables,并且需要某种机制将 class 上的属性与 [=93] 中的列相关联=].如果您不使用 ORM,您的 tables 不会映射到 Python classes,因此没有使用映射器。这就是为什么您从 get_mapper()
.
在你的例子中:
m = MetaData(bind=engine,schema='test')
m.reflect()
MetaData
是:
A collection of
Table
objects and their associated schema constructs.
Automatically creates
Table
entries in thisMetaData
for any table available in the database but not yet present in theMetaData
.
所以此时,您有一个 Table
个对象的集合,您想要对其中一个对象执行批量插入。不要混淆 Table
对象与 ORM 映射 classes,它们不是一回事。
bulk_insert_mappings
状态下的文档:
Perform a bulk insert of the given list of mapping dictionaries.
和
The values within the dictionaries as given are typically passed without modification into Core Insert() constructs
您正在尝试实现数据的批量插入,我们可以跳过 ORM 方法(任何涉及 Session
的方法)并与核心显式交互。
表达式 pd.DataFrame({'a':['a','b','c']}).to_dict(orient="records")
returns dict
的列表,例如:[{'a': 'a'}, {'a': 'b'}, {'a': 'c'}]
,因此为了简单起见,我将使用此处的示例输出。
您的元数据对象中有 table,您已经使用 m.tables['test.test']
检索了该对象,并且该 Table
对象可用于生成其自己的插入语句:
print(m.tables['test.test'].insert())
# INSERT INTO test.test (a) VALUES (%(a)s)
要执行多个语句,我们可以将字典列表传递给 Connection.execute()
,如下所示。
ORM Session
的一个好处是它允许显式事务管理,您可以在必要时调用 Session.rollback()
或 Session.commit()
。连接对象也可以在类似于 Session
使用 Engine.begin()
.
例如,使用上下文管理器:
with engine.begin() as conn:
conn.execute(
m.tables['test.test'].insert(),
*[{'a': 'a'}, {'a': 'b'}, {'a': 'c'}]
)
如果上下文中没有错误,这将自动提交查询,如果有错误则回滚。
引擎日志显示此表达式发出以下查询:
INSERT INTO test.test (a) VALUES (%(a)s)
({'a': 'a'}, {'a': 'b'}, {'a': 'c'})
以下人为设计的示例显示了您使用 Session.bulk_insert_mappings()
进行的原始查询。我必须创建一个 ORM 模型来表示 table 并向 table 添加一个 id
字段,因为 ORM 不喜欢在没有主键的情况下工作。
m = MetaData(bind=engine,schema='test')
Base = declarative_base(metadata=m)
class Test(Base):
__tablename__ = 'test'
id = Column(Integer, primary_key=True)
a = Column(Text)
Session = sessionmaker(bind=engine)
s = Session()
s.bulk_insert_mappings(get_mapper(m.tables['test.test']), pd.DataFrame({'a':['a','b','c']}).to_dict(orient="records"))
s.commit()
s.close()
这是引擎日志中执行的查询:
INSERT INTO test.test (a) VALUES (%(a)s)
({'a': 'a'}, {'a': 'b'}, {'a': 'c'})
您会注意到,这与我们通过直接使用核心实现的查询完全相同。