SQL Alchemy - Python 从 Oracle 迁移到 MySQL 的脚本
SQL Alchemy - Python script to migrate from Oracle to MySQL
我正在尝试使用 cx_Oracle 和 SQL Alchemy 执行从 Oracle 到 MySQL 的批量 extracts/loads。
我在网上找到了这个示例,它适用于大多数数据类型,但无法用于 Blob 数据类型:
https://vbaoverall.com/transfer-data-from-oracle-to-mysql-using-sqlalchemy-python/
我有大约 43 个表,其中大约 12 个具有 BLOB
数据类型。
import cx_Oracle
import pandas as pd
from sqlalchemy import create_engine
import pymysql
import warnings
warnings.filterwarnings('ignore')
# list out all 43 tables:
table_list = [
"FILE",
"ATTACHMENT",
"DOCUMENTS",
"USERS",
"INFO",
"ONE",
"TWO",
"THREE",
"FOUR",
"...."
]
# Set Oralce Connection
dsn_tns = cx_Oracle.makedsn('source.example.com', '1530', service_name='test')
oracle_connection = cx_Oracle.connect(user='root', password='toot', dsn=dsn_tns)
# Open Oracle cursor
cursor = oracle_connection.cursor()
# set mysql connection with foreign key checks
mysql_engine = create_engine("mysql+pymysql://root:toot@target.example.com:3306/target")
mysql_engine.execute("SET FOREIGN_KEY_CHECKS=0")
# loop thru tables:
for table in table_list:
# select from oracle
sql = "SELECT * FROM " + table
# read into pandas df
data=pd.read_sql(sql, oracle_connection)
# insert into mysql
mysql_engine.execute("TRUNCATE TABLE "+table)
data.to_sql(table, con=mysql_engine, if_exists='append', index=False, chunksize=10000)
print("{}: sucessfully inserted {} rows.".format(table, data.shape[0]))
# update foreign key checks
mysql_engine.execute("SET FOREIGN_KEY_CHECKS=1")
#close connection
oracle_connection.close()
mysql_engine.dispose()
这是我遇到的错误:
return "'%s'" % escape_string(str(value), mapping)
TypeError: __str__ returned non-string (type bytes)
感谢 @Gord Thompson,我发现我只需要指定 dtype=
import cx_Oracle
import pandas as pd
from sqlalchemy import create_engine
import sqlalchemy
import pymysql
import warnings
warnings.filterwarnings('ignore')
table_list = [
"FILE",
"ATTACHMENT",
"DOCUMENTS",
"USERS",
"INFO",
"ONE",
"TWO",
"THREE",
"FOUR",
"...."
]
# Set Oralce Connection
dsn_tns = cx_Oracle.makedsn('source.example.com', '1530', service_name='test')
oracle_connection = cx_Oracle.connect(user='root', password='toot', dsn=dsn_tns)
# Open Oracle cursor
cursor = oracle_connection.cursor()
# set mysql connection with foreign key checks
mysql_engine = create_engine("mysql+pymysql://root:toot@target.example.com:3306/target")
mysql_engine.execute("SET FOREIGN_KEY_CHECKS=0")
for table in table_list:
# select from oracle
sql = "SELECT * FROM " + table
# read into pandas df
data=pd.read_sql(sql, oracle_connection)
dtype = {}
if table == "ATTACHMENT":
dtype['FILE_CONTENT'] = sqlalchemy.types.PickleType
# insert into mysql
mysql_engine.execute("TRUNCATE TABLE "+table)
data.to_sql(table, con=mysql_engine, if_exists='append', index = False, chunksize =10000, dtype=dtype)
print("{}: sucessfully inserted {} rows.".format(table, data.shape[0]))
# update foreign key checks
mysql_engine.execute("SET FOREIGN_KEY_CHECKS=1")
#close connection
oracle_connection.close()
mysql_engine.dispose()
我正在尝试使用 cx_Oracle 和 SQL Alchemy 执行从 Oracle 到 MySQL 的批量 extracts/loads。
我在网上找到了这个示例,它适用于大多数数据类型,但无法用于 Blob 数据类型:
https://vbaoverall.com/transfer-data-from-oracle-to-mysql-using-sqlalchemy-python/
我有大约 43 个表,其中大约 12 个具有 BLOB
数据类型。
import cx_Oracle
import pandas as pd
from sqlalchemy import create_engine
import pymysql
import warnings
warnings.filterwarnings('ignore')
# list out all 43 tables:
table_list = [
"FILE",
"ATTACHMENT",
"DOCUMENTS",
"USERS",
"INFO",
"ONE",
"TWO",
"THREE",
"FOUR",
"...."
]
# Set Oralce Connection
dsn_tns = cx_Oracle.makedsn('source.example.com', '1530', service_name='test')
oracle_connection = cx_Oracle.connect(user='root', password='toot', dsn=dsn_tns)
# Open Oracle cursor
cursor = oracle_connection.cursor()
# set mysql connection with foreign key checks
mysql_engine = create_engine("mysql+pymysql://root:toot@target.example.com:3306/target")
mysql_engine.execute("SET FOREIGN_KEY_CHECKS=0")
# loop thru tables:
for table in table_list:
# select from oracle
sql = "SELECT * FROM " + table
# read into pandas df
data=pd.read_sql(sql, oracle_connection)
# insert into mysql
mysql_engine.execute("TRUNCATE TABLE "+table)
data.to_sql(table, con=mysql_engine, if_exists='append', index=False, chunksize=10000)
print("{}: sucessfully inserted {} rows.".format(table, data.shape[0]))
# update foreign key checks
mysql_engine.execute("SET FOREIGN_KEY_CHECKS=1")
#close connection
oracle_connection.close()
mysql_engine.dispose()
这是我遇到的错误:
return "'%s'" % escape_string(str(value), mapping)
TypeError: __str__ returned non-string (type bytes)
感谢 @Gord Thompson,我发现我只需要指定 dtype=
import cx_Oracle
import pandas as pd
from sqlalchemy import create_engine
import sqlalchemy
import pymysql
import warnings
warnings.filterwarnings('ignore')
table_list = [
"FILE",
"ATTACHMENT",
"DOCUMENTS",
"USERS",
"INFO",
"ONE",
"TWO",
"THREE",
"FOUR",
"...."
]
# Set Oralce Connection
dsn_tns = cx_Oracle.makedsn('source.example.com', '1530', service_name='test')
oracle_connection = cx_Oracle.connect(user='root', password='toot', dsn=dsn_tns)
# Open Oracle cursor
cursor = oracle_connection.cursor()
# set mysql connection with foreign key checks
mysql_engine = create_engine("mysql+pymysql://root:toot@target.example.com:3306/target")
mysql_engine.execute("SET FOREIGN_KEY_CHECKS=0")
for table in table_list:
# select from oracle
sql = "SELECT * FROM " + table
# read into pandas df
data=pd.read_sql(sql, oracle_connection)
dtype = {}
if table == "ATTACHMENT":
dtype['FILE_CONTENT'] = sqlalchemy.types.PickleType
# insert into mysql
mysql_engine.execute("TRUNCATE TABLE "+table)
data.to_sql(table, con=mysql_engine, if_exists='append', index = False, chunksize =10000, dtype=dtype)
print("{}: sucessfully inserted {} rows.".format(table, data.shape[0]))
# update foreign key checks
mysql_engine.execute("SET FOREIGN_KEY_CHECKS=1")
#close connection
oracle_connection.close()
mysql_engine.dispose()