使用 SQLAlchemy 将数据从 MS SQL 迁移到 PostgreSQL

Data migration from MS SQL to PostgreSQL using SQLAlchemy

TL;DR

我想将数据从 MS SQL Server + ArcSDE 迁移到 PostgreSQL + PostGIS,最好使用 SQLAlchemy。


我正在使用 SQLAlchemy 1.0.11MS SQL 2012 迁移现有数据库到 PostgreSQL 9.2(计划升级到 9.5)。

我一直在阅读有关此内容的信息,并找到了几个不同的来源 (Tyler Lesmann, Inada Naoki, Stefan Urbanek, and Mathias Fussenegger),这些来源具有类似的方法来完成此任务:

  1. 连接到两个数据库
  2. 反映源数据库table
  3. 遍历 table 并针对每个 table
    1. 在目标数据库中创建一个相等的table
    2. 获取源中的行并将它们插入目标数据库

代码

这是一个使用上次参考中的代码的简短示例。

from sqlalchemy import create_engine, MetaData

src = create_engine('mssql://user:pass@host/database?driver=ODBC+Driver+13+for+SQL+Server')
dst = create_engine('postgresql://user:pass@host/database')

meta = MetaData()
meta.reflect(bind=src)

tables = meta.tables

for tbl in tables:
    data = src.execute(tables[tbl].select()).fetchall()
    if data:
        dst.execute(tables[tbl].insert(), data)

我知道同时获取所有行是个坏主意,可以使用迭代器或 fetchmany 来完成,但这不是我现在的问题。

问题 1

我的数据库中的所有四个示例都失败了。我得到的一个错误与 NVARCHAR:

类型的列有关
sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) type "nvarchar" does not exist
LINE 5:  "desigOperador" NVARCHAR(100) COLLATE "SQL_Latin1_General_C...
                         ^
 [SQL: '\nCREATE TABLE "Operators" (\n\t"idOperador" INTEGER NOT NULL, \n\t"idGrupo" INTEGER, \n\t"desigOperador" NVARCHAR(100) COLLATE "SQL_Latin1_General_CP1_CI_AS", \n\t"Rua" NVARCHAR(200) COLLATE "SQL_Latin1_General_CP1_CI_AS", \n\t"Localidade" NVARCHAR(200) COLLATE "SQL_Latin1_General_CP1_CI_AS", \n\t"codPostal" NVARCHAR(10) COLLATE "SQL_Latin1_General_CP1_CI_AS", \n\tdataini DATETIME, \n\tdataact DATETIME, \n\temail NVARCHAR(50) COLLATE "SQL_Latin1_General_CP1_CI_AS", \n\turl NVARCHAR(50) COLLATE "SQL_Latin1_General_CP1_CI_AS", \n\tPRIMARY KEY ("idOperador")\n)\n\n']

根据这个错误我的理解是PostgreSQL没有NVARCHAR而是VARCHAR,应该是等价的。我认为 SQLAlchemy 会自动将它们都映射到其抽象层中的 String,但在这种情况下它可能不会那样工作。

问题:我是否应该预先定义所有的classes/tables,例如在models.py中,以避免这样的错误?如果是这样,它将如何与给定(或其他)工作流程集成?

其实这个错误是从Urbanek的运行代码中得到的,我可以在其中指定我要复制哪些table。 运行 上面的例子让我...

问题 2

MS SQL 安装是一个使用 ArcSDE(空间数据库引擎)的地理数据库。因此,某些列属于 非默认 几何类型。在 PostgreSQL 方面,我使用的是 PostGIS 2.

尝试使用这些类型复制 table 时,我收到如下警告:

/usr/local/lib/python2.7/dist-packages/sqlalchemy/dialects/mssql/base.py:1791: SAWarning: Did not recognize type 'geometry' of column 'geom'
  (type, name))
/usr/local/lib/python2.7/dist-packages/sqlalchemy/dialects/mssql/base.py:1791: SAWarning: Did not recognize type 'geometry' of column 'shape'

随后出现另一个错误(这个错误实际上是在执行上面提供的代码时抛出的):

sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) relation "SDE_spatial_references" does not exist
LINE 1: INSERT INTO "SDE_spatial_references" (srid, description, aut...
                    ^

我认为它未能创建警告中提到的列,但在需要这些列的后续步骤中抛出了错误。

问题:问题是前一个问题的扩展:如何使用自定义(或在其他地方定义)类型进行迁移?

我知道 GeoAlchemy2 that can be used with PostGIS. GeoAlchemy supports MS SQL Server 2008, but in that case I guess I'm stuck with SQLAlchemy 0.8.4 (perhaps with less nice features). Also, I found here 可以使用 GeoAlchemy 定义的类型进行反射。但是,我的问题仍然存在。

可能相关

编辑

当我看到引用SDE_spatial_references的错误时,我认为它可能与ArcSDE有关,因为同一台机器还安装了ArcGIS for Server。然后我了解到MS SQL Server也有一些Spatial Data Types,然后我确认是这样。我编辑错了:数据库确实使用ArcSDE。

编辑 2

这里有一些我忘记包含的更多细节。

迁移不必使用 SQLAlchemy 完成。我认为这是个好主意,因为:

我尝试过但失败的其他事情(现在不记得确切的原因,但如果有任何答案提到它们,我会再次检查它们):

数据库详细信息:

我推荐此流程,其中包含两个重要的迁移步骤:

迁移架构

  • 转储源数据库模式,最好是跨数据工具(如 UML)的某种统一格式(此步骤和后续步骤将需要并且更容易使用 Toad Data Modeler or IBM Rational Rose)。
  • 在需要时使用 TDM 或 RR 将表定义从源类型更改为目标类型。例如在 postgres 中摆脱 varchar(n) 并坚持使用 text,除非您 特别需要应用程序在字符串长于 n 的数据层上崩溃。如果无法在数据建模工具中转换复杂类型,请忽略(暂时)几何等复杂类型。
  • 为目标数据库生成 DDL-file(再次提到的工具在这里很方便)。
  • 创建(并添加到表中)复杂类型,因为它们应该由目标 RDBMS 处理。尝试插入几个条目以确保数据类型一致。 将这些类型添加到您的 DDL 文件
  • 您可能还想在这里禁用外键约束等检查。

迁移数据

  1. 将简单表(即带有标量字段)转储到 CSV。
  2. Import simple tables data.
  3. 编写一段简单的代码到 select 来自源的复杂数据并将其插入目标(这比听起来容易,只需 select -> 映射属性 -> 插入)。不要在一个代码例程中为所有复杂类型编写迁移,保持简单,分而治之。
  4. 如果您在迁移模式时没有禁用检查,则可能需要对不同的表重复步骤 2 和 3(这就是禁用检查的原因:))。
  5. 启用检查。

这样您就可以将迁移过程拆分为简单的原子步骤,数据迁移第 3 步失败不会导致您返回架构迁移等。您可以只截断几个表,并在失败时重新运行数据导入。

这是我使用 SQLAlchemy 的解决方案。这是一篇长篇博文post,希望能采纳table,对大家有用

可能,这也适用于 sourcetarget 数据库的其他组合(除了 MS SQL Server 和 PostgreSQL, 分别), 尽管它们没有经过测试。

工作流程(长话短说;DR)

  1. 自动检查源并推导现有的 table 模型(这称为反射)。
  2. 导入先前定义的 table 模型,这些模型将用于在目标中创建新的 tables。
  3. 迭代 table 个模型(源和目标中都存在的模型)。
  4. 对于每个 table,从源中获取行块并将它们插入到目标中。

要求


详细步骤

1。连接到数据库

SQLAlchemy 将引擎调用到处理应用程序与实际数据库之间的连接的对象。因此,要连接到数据库,必须使用相应的连接字符串创建一个引擎。一个数据库的典型形式URL是:

dialect+driver://username:password@host:port/database

您可以在 SQLAlchemy documentation.

中看到一些连接示例 URL

一旦创建,引擎将不会建立连接,直到通过 .connect() 方法或调用依赖于此方法的操作(例如,.execute()).

con = ms_sql.connect()

2。定义并创建 tables

2.1 源数据库

源端的表已经定义好,可以使用table反射:

from sqlalchemy import MetaData

metadata = MetaData(source_engine)
metadata.reflect(bind=source_engine)

如果您尝试这样做,您可能会看到一些警告。例如,

SAWarning: Did not recognize type 'geometry' of column 'Shape'

那是因为 SQLAlchemy 不自动识别自定义类型。在我的具体案例中,这是因为 ArcSDE 类型。但是,当您只需要读取数据时,这不是问题。忽略这些警告即可。

在 table 反射之后,您可以通过该元数据对象访问现有的 table。

# see all the tables names
print list(metadata.tables)
# handle the table named 'Troco'
src_table = metadata.tables['Troco']
# see that table columns
print src_table.c

2.2 目标数据库

对于target,因为我们是在新建数据库,所以无法使用tables反射。但是,创建 table 个模型 through SQLAlchemy 并不复杂;事实上,它可能比编写纯 SQL.

更简单
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class SomeClass(Base):
    __tablename__ = 'some_table'
    id = Column(Integer, primary_key=True)
    name =  Column(String(50))
    Shape = Column(Geometry('MULTIPOLYGON', srid=102165))

在此示例中,有一列包含空间数据(感谢 GeoAlchemy2 在此处定义)。

现在,如果您有十分之一的 table,那么定义这么多 table 可能会令人费解、乏味或容易出错。幸运的是,有一个 sqlacodegen 工具可以读取现有数据库的结构并生成相应的 SQLAlchemy 模型代码。示例:

pip install sqlacodegen
sqlacodegen mssql:///some_local_db --outfile models.py

因为这里的目的只是迁移数据,而不是模式,您可以从源数据库创建模型,然后只adapt/correct将生成的代码迁移到目标数据库。

注意:它会生成混合的class模型和Table模型。阅读 了解此行为。

同样,您将看到关于无法识别的自定义数据类型的类似警告。这就是为什么我们现在必须编辑 models.py 文件并调整模型的原因之一。以下是一些需要调整的提示:

  • 具有自定义数据类型的列使用 NullType 定义。将它们替换为正确的类型,例如 GeoAlchemy2 的 Geometry。 定义 Geometry 时,传递正确的几何类型(线串、多线串、多边形等)和 SRID。
  • PostgreSQL 字符类型支持可变长度,SQLAlchemy 默认将 String 列映射到它们,因此我们可以替换所有 UnicodeString(...) 通过 String。请注意,在String中指定字符数不是必需的,也不可取(不要在此引用我的话),只需省略它们即可。
  • 您将不得不仔细检查,但可能所有 BIT 列实际上都是 Boolean.
  • 大多数数字类型(例如,Float(...)Numeric(...))以及字符类型,都可以简化为 Numeric。小心例外 and/or 某些特定情况。
  • 我注意到定义为索引的列存在一些问题 (index=True)。在我的例子中,因为模式将被迁移,所以现在不需要这些并且可以安全地删除。
  • 确保 table 和列名在两个数据库中相同(反映 table 和定义的模型),这是后续步骤的要求。

现在我们可以将模型和数据库连接在一起,并在目标端创建所有table。

Base.metadata.bind = postgres
Base.metadata.create_all()

请注意,默认情况下,.create_all() 不会触及现有的 table。如果您想重新创建数据或将数据插入现有 table,则需要事先 DROP

Base.metadata.drop_all()

3。获取数据

现在您可以从一侧复制数据,稍后将其粘贴到另一侧。基本上,您只需要为每个 table 发出一个 SELECT 查询。通过 SQLAlchemy ORM.

提供的抽象层,这是可能且容易做到的事情
data = ms_sql.execute(metadata.tables['TableName'].select()).fetchall()

然而,这还不够,您还需要更多的控制。其原因与 ArcSDE 有关。因为它使用专有格式,所以您可以检索数据但无法正确解析它。你会得到这样的东西:

(1, Decimal('0'), u' ', bytearray(b'\x01\x02\x00\x00\x00\x02\x00\x00\x00@\xb1\xbf\xec/\xf8\xf4\xc0\x80\nF%\x99(\xf9\xc0@\xe3\xa5\x9b\x94\xf6\xf4\xc0\x806\xab>\xc5%\xf9\xc0'))

此处的解决方法是将几何列转换为 Well Known Text (WKT) 格式。这种转换必须在数据库端进行。 ArcSDE 在那里,所以它知道如何转换它。因此,例如,在 TableName 中有一个包含名为 shape 的空间数据的列。所需的 SQL 语句应如下所示:

SELECT [TableName].[shape].STAsText() FROM [TableName]

这使用 .STAsText(),SQL 服务器的一种几何数据类型方法。

如果您不使用 ArcSDE,则不需要执行以下步骤:

  • 迭代 tables(仅那些在源和目标中都定义的),
  • 对于每个 table,查找几何列(预先列出)
  • 像上面那样构建一个 SQL 语句

创建语句后,SQLAlchemy 可以执行它。

result = ms_sql.execute(statement)

事实上,这实际上并没有获取数据(与 ORM 示例比较——注意缺少的 .fetchall() 调用)。为了解释,这里引用了 SQLAlchemy 文档中的一段话:

The returned result is an instance of ResultProxy, which references a DBAPI cursor and provides a largely compatible interface with that of the DBAPI cursor. The DBAPI cursor will be closed by the ResultProxy when all of its result rows (if any) are exhausted.

数据只会在插入之前被检索。

4。插入数据

连接已建立,table已创建,数据已准备好,现在让我们插入。与获取数据类似,SQLAlchemy 还允许通过其 ORM 将 INSERT 数据转换为给定的 table:

postgres_engine.execute(Base.metadata.tables['TableName'].insert(), data)

同样,这很容易,但由于非标准格式和错误数据,可能需要进一步操作。

4.1 匹配列

首先,将源列与目标列(属于同一 table)匹配存在一些问题——这可能与 Geometry 列有关。一种可能的解决方案是创建一个 Python 字典,它将源列中的值映射到目标列的键(名称)。

这是逐行执行的 -- 虽然它并不像人们想象的那么慢,因为实际插入将同时插入几行。因此,每行将有一个字典,而不是插入数据对象(这是一个元组列表;一个元组对应一行),您将插入一个字典列表。

这是单行的示例。获取的数据是一元组的列表,values是构建的字典。

# data
[(1, 6, None, None, 204, 1, True, False, 204, 1.0, 1.0, 1.0, False, None]
# values
[{'DateDeleted': None, 'sentidocirculacao': False, 'TempoPercursoMed': 1.0,
  'ExtensaoTroco': 204, 'OBJECTID': 229119, 'NumViasSentido': 1,
  'Deleted': False, 'TempoPercursoMin': 1.0, 'IdCentroOp': 6,
  'IDParagemInicio': None, 'IDParagemFim': None, 'TipoPavimento': True,
  'TempoPercursoMax': 1.0, 'IDTroco': 1, 'CorredorBusext': 204}]

注意 Python 词典没有排序,这就是为什么两个列表中的数字不在同一位置的原因。为简化起见,已从此示例中删除几何列。

4.2 固定几何体

如果没有发生此问题,可能不需要以前的解决方法:有时几何图形 stored/retrieved 类型错误。

在 MSSQL/ArcSDE 中,几何数据类型未指定存储的几何类型(即线、多边形等)。它只关心它是一个几何体。此信息存储在另一个(系统)table 中,称为 SDE_geometry_columns(请参阅该页面的底部)。但是,Postgres(实际上是 PostGIS)在定义几何列时需要几何类型。

这会导致空间数据以错误的几何类型存储。错误的意思是它与应有的不同。例如,查看 SDE_geometry_columns table(摘录):

f_table_name        geometry_type
TableName               9

geometry_type = 9对应ST_MULTILINESTRING。但是,TableName table 中的行存储(或接收)为 ST_LINESTRING。这种不匹配会在 Postgres 端引发错误。

作为解决方法,您可以在创建上述词典时编辑 WKT。例如,'LINESTRING (10 12, 20 22)' 转换为 MULTILINESTRING ((10 12, 20 22))'.

4.3 缺少 SRID

最后,如果您愿意保留SRID,您还需要在创建几何列时定义它们。

如果table模型中定义了SRID,在Postgres中插入数据时必须满足。问题在于,当使用 .STAsText() 方法将几何数据作为 WKT 获取时,您会丢失 SRID 信息。

幸运的是,PostGIS 支持包含 SRID 的 Extended-WKT (E-WKT) 格式。 此处的解决方案是在修复几何时包含 SRID。同样的例子,'LINESTRING (10 12, 20 22)' 被转换为 'SRID=102165;MULTILINESTRING ((10 12, 20 22))'.

4.4 获取和插入

一切就绪后,您就可以插入了。如前所述,只有现在才真正从源中检索数据。您可以在数据块(用户定义的数量)中执行此操作,例如,一次 1000 行。

虽然正确: 行数 = data.fetchmany(1000) 如果不是行: 休息 values = [{key: (val if key.lower() != "shape" else fix(val, 102165)) for key, val in zip(keys, row)} for row in rows] postgres_engine.execute(target_table.insert(), 值)

这里的 fix() 是一个函数,它将更正几何并将给定的 SRID 添加到几何列(在本例中,由 "shape" 的列名标识)——就像上述--,而values就是前面提到的字典列表。

结果

结果是模式和数据的副本,存在于 MS SQL 服务器 + ArcSDE 数据库中,进入 PostgreSQL + PostGIS 数据库。

这里有一些统计数据,来自我的用例,用于性能分析。两个数据库都在同一台机器上;代码是从不同的机器执行的,但在同一个本地网络中。

Tables   |   Geometry Column   |   Rows   |   Fixed Geometries   |   Insert Time
---------------------------------------------------------------------------------
Table 1      MULTILINESTRING      1114797             702              17min12s
Table 2            None            460874             ---               4min55s
Table 3      MULTILINESTRING       389485          389485               4min20s
Table 4        MULTIPOLYGON          4050            3993                   34s
Total                             3777964          871243              48min27s

我在尝试从 Oracle 9i to MySQL 迁移时遇到了同样的问题。

我构建了 etlalchemy 来解决这个问题,目前已经测试了迁移到 MySQL、PostgreSQL、SQL服务器、Oracle 和 SQLite。它利用了上述 RDBMS 的 SQLAlchemy 和 BULK CSV 导入功能(而且速度非常快!)。

安装(非 El-capitan):pip install etlalchemy

安装(El-capitan):pip install --ignore-installed etlalchemy

运行:

from etlalchemy import ETLAlchemySource, ETLAlchemyTarget
# Migrate from SQL Server onto PostgreSQL
src = ETLAlchemySource("mssql+pyodbc://user:passwd@DSN_NAME")
tgt = ETLAlchemyTarget("postgresql://user:passwd@hostname/dbname",
                          drop_database=True)
tgt.addSource(src)
tgt.migrate()