SQLAlchemy bulk_insert_mappings(): 无法获取 table 'test' 的映射器

SQLAlchemy bulk_insert_mappings(): Could not get mapper for table 'test'

我一直在尝试使用 sqlalchemy 的 bulk_insert_mappings。我知道我可以创建会话并连接到数据库。我已经初始化了我的引擎,但我似乎无法从 table 中获得我需要的映射器。

from sqlalchemy import create_engine
from sqlalchemy.orm.session import sessionmaker,Session
from sqlalchemy_utils import get_mapper

engine = create_engine('mysql+pymysql://{}:{}@IP:PORT/'.format(USER,PW)) # removed my config here
connection = engine.connect()
m = MetaData(bind=engine,schema='test')
m.reflect()

Session = sessionmaker(bind=engine)
s = Session()
s.bulk_insert_mappings(get_mapper(m.tables['test.test']), pd.DataFrame({'a':['a','b','c']}).to_dict(orient="records"))
s.commit()
s.close()

我最近在SO上发现了一堆相关问题

SQLAlchemy get Mapper object from Table object (from Metadata or Session or otherwise)

但 sqlalchemy_utils.get_mapper 加注:

"ValueError: Could not get mapper for table 'test'."

sqlalchemy.orm.mapperlib._mapper_registry 似乎是空的。也许是因为我没有将它绑定到我的引擎。但不确定该怎么做。

PS: 测试是一个非常简单的单列table,类型为TEXT

这是 m.tables['test.test']

的输出
Table('test', MetaData(bind=Engine(mysql+pymysql://USER:***@IP:PORT/)), Column('a', TEXT(), table=<test>), schema='test')

我一直在谷歌搜索完全相同的问题。但是,我找到了解决此问题的方法。

class Helper():
   pass
new_mapper = sqlalchemy.orm.mapper(Helper, local_table = m.tables['test.test'])
session.bulk_insert_mappings(new_mapper, 
df.to_dict(orient="records"), return_defaults = False)
session.commit()
session.close()

根据 the following link,我认为 df.to_sql 在将大量数据帧插入 sql 表时表现非常差。然而,事实证明 bulk_insert_mappings 慢得多。 希望对你有帮助。

SQLAlchemy Mapper 的工作是:

Define the correlation of class attributes to database table columns.

... 它是 SQLAlchemy ORM 的基础。对于 ORM,Python classes 表示数据库中的 tables,并且需要某种机制将 class 上的属性与 [=93] 中的列相关联=].如果您不使用 ORM,您的 tables 不会映射到 Python classes,因此没有使用映射器。这就是为什么您从 get_mapper().

得到错误的原因

在你的例子中:

m = MetaData(bind=engine,schema='test')
m.reflect()

MetaData 是:

A collection of Table objects and their associated schema constructs.

MetaData.reflect

Automatically creates Table entries in this MetaData for any table available in the database but not yet present in the MetaData.

所以此时,您有一个 Table 个对象的集合,您想要对其中一个对象执行批量插入。不要混淆 Table 对象与 ORM 映射 classes,它们不是一回事。

bulk_insert_mappings 状态下的文档:

Perform a bulk insert of the given list of mapping dictionaries.

The values within the dictionaries as given are typically passed without modification into Core Insert() constructs

您正在尝试实现数据的批量插入,我们可以跳过 ORM 方法(任何涉及 Session 的方法)并与核心显式交互。

表达式 pd.DataFrame({'a':['a','b','c']}).to_dict(orient="records") returns dict 的列表,例如:[{'a': 'a'}, {'a': 'b'}, {'a': 'c'}],因此为了简单起见,我将使用此处的示例输出。

您的元数据对象中有 table,您已经使用 m.tables['test.test'] 检索了该对象,并且该 Table 对象可用于生成其自己的插入语句:

print(m.tables['test.test'].insert())
# INSERT INTO test.test (a) VALUES (%(a)s)

要执行多个语句,我们可以将字典列表传递给 Connection.execute(),如下所示。

ORM Session 的一个好处是它允许显式事务管理,您可以在必要时调用 Session.rollback()Session.commit()。连接对象也可以在类似于 Session 使用 Engine.begin().

的事务中显式操作

例如,使用上下文管理器:

with engine.begin() as conn:
    conn.execute(
        m.tables['test.test'].insert(),
        *[{'a': 'a'}, {'a': 'b'}, {'a': 'c'}]
    )

如果上下文中没有错误,这将自动提交查询,如果有错误则回滚。

引擎日志显示此表达式发出以下查询:

INSERT INTO test.test (a) VALUES (%(a)s)
({'a': 'a'}, {'a': 'b'}, {'a': 'c'})

以下人为设计的示例显示了您使用 Session.bulk_insert_mappings() 进行的原始查询。我必须创建一个 ORM 模型来表示 table 并向 table 添加一个 id 字段,因为 ORM 不喜欢在没有主键的情况下工作。

m = MetaData(bind=engine,schema='test')
Base = declarative_base(metadata=m)

class Test(Base):
    __tablename__ = 'test'
    id = Column(Integer, primary_key=True)
    a = Column(Text)


Session = sessionmaker(bind=engine)
s = Session()
s.bulk_insert_mappings(get_mapper(m.tables['test.test']), pd.DataFrame({'a':['a','b','c']}).to_dict(orient="records"))
s.commit()
s.close()

这是引擎日志中执行的查询:

INSERT INTO test.test (a) VALUES (%(a)s)
({'a': 'a'}, {'a': 'b'}, {'a': 'c'})

您会注意到,这与我们通过直接使用核心实现的查询完全相同。