MySQL 中的 SQLalchemy 批量更新工作非常缓慢
Sqlalchemy bulk update in MySQL works very slow
我正在使用 SQLAlchemy 1.0.0
,并想批量进行一些 UPDATE ONLY
(如果主键匹配则更新,否则什么都不做)查询。
我做了一些实验,发现批量更新看起来比批量插入或批量慢得多 upsert
。
你能帮我指出为什么它工作这么慢吗?或者有没有其他方法 way/idea 来制作 BULK UPDATE (not BULK UPSERT) with SQLAlchemy
?
下面是MYSQL中的table:
CREATE TABLE `test` (
`id` int(11) unsigned NOT NULL,
`value` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
以及测试代码:
from sqlalchemy import create_engine, text
import time
driver = 'mysql'
host = 'host'
user = 'user'
password = 'password'
database = 'database'
url = "{}://{}:{}@{}/{}?charset=utf8".format(driver, user, password, host, database)
engine = create_engine(url)
engine.connect()
engine.execute('TRUNCATE TABLE test')
num_of_rows = 1000
rows = []
for i in xrange(0, num_of_rows):
rows.append({'id': i, 'value': i})
print '--------- test insert --------------'
sql = '''
INSERT INTO test (id, value)
VALUES (:id, :value)
'''
start = time.time()
engine.execute(text(sql), rows)
end = time.time()
print 'Cost {} seconds'.format(end - start)
print '--------- test upsert --------------'
for r in rows:
r['value'] = r['id'] + 1
sql = '''
INSERT INTO test (id, value)
VALUES (:id, :value)
ON DUPLICATE KEY UPDATE value = VALUES(value)
'''
start = time.time()
engine.execute(text(sql), rows)
end = time.time()
print 'Cost {} seconds'.format(end - start)
print '--------- test update --------------'
for r in rows:
r['value'] = r['id'] * 10
sql = '''
UPDATE test
SET value = :value
WHERE id = :id
'''
start = time.time()
engine.execute(text(sql), rows)
end = time.time()
print 'Cost {} seconds'.format(end - start)
num_of_rows = 100时的输出:
--------- test insert --------------
Cost 0.568960905075 seconds
--------- test upsert --------------
Cost 0.569655895233 seconds
--------- test update --------------
Cost 20.0891299248 seconds
num_of_rows = 1000时的输出:
--------- test insert --------------
Cost 0.807548999786 seconds
--------- test upsert --------------
Cost 0.584554195404 seconds
--------- test update --------------
Cost 206.199367046 seconds
数据库服务器的网络延迟约为 500 毫秒。
看起来批量更新它发送并执行每个查询,而不是批量?
提前致谢。
即使数据库服务器(如您的情况)有非常糟糕的延迟,您也可以使用一个技巧来加快批量更新操作。您不是直接更新 table,而是使用 stage-table 非常快速地插入新数据,然后对 目的地-table。这还有一个好处,就是可以显着减少必须发送到数据库的语句数量。
这如何与更新一起使用?
假设你有一个 table entries
并且你一直有新数据进来,但你只想更新那些已经存储的数据。您创建了目的地副本-table entries_stage
,其中仅包含相关字段:
entries = Table('entries', metadata,
Column('id', Integer, autoincrement=True, primary_key=True),
Column('value', Unicode(64), nullable=False),
)
entries_stage = Table('entries_stage', metadata,
Column('id', Integer, autoincrement=False, unique=True),
Column('value', Unicode(64), nullable=False),
)
然后使用批量插入插入数据。如果你使用 MySQL 的多值插入语法,这可以进一步加速,SQLAlchemy 本身不支持这种语法,但可以毫不费力地构建。
INSERT INTO enries_stage (`id`, `value`)
VALUES
(1, 'string1'), (2, 'string2'), (3, 'string3'), ...;
最后,您使用阶段-table 中的值更新目标-table 的值,如下所示:
UPDATE entries e
JOIN entries_stage es ON e.id = es.id
SET e.value = es.value
WHERE e.value != es.value;
那就大功告成了。
插入呢?
这当然也可以加快插入速度。由于您已经在 stage-table 中拥有数据,您需要做的就是发出一个 INSERT INTO ... SELECT
语句,其中的数据不在 目的地-table还没有。
INSERT INTO entries (id, value)
SELECT FROM entries_stage es
LEFT JOIN entries e ON e.id = es.id
HAVING e.id IS NULL;
这样做的好处是您不必执行 INSERT IGNORE
、REPLACE
或 ON DUPLICATE KEY UPDATE
, 会增加您的主键,甚至如果他们什么都不做.
我正在使用 SQLAlchemy 1.0.0
,并想批量进行一些 UPDATE ONLY
(如果主键匹配则更新,否则什么都不做)查询。
我做了一些实验,发现批量更新看起来比批量插入或批量慢得多 upsert
。
你能帮我指出为什么它工作这么慢吗?或者有没有其他方法 way/idea 来制作 BULK UPDATE (not BULK UPSERT) with SQLAlchemy
?
下面是MYSQL中的table:
CREATE TABLE `test` (
`id` int(11) unsigned NOT NULL,
`value` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
以及测试代码:
from sqlalchemy import create_engine, text
import time
driver = 'mysql'
host = 'host'
user = 'user'
password = 'password'
database = 'database'
url = "{}://{}:{}@{}/{}?charset=utf8".format(driver, user, password, host, database)
engine = create_engine(url)
engine.connect()
engine.execute('TRUNCATE TABLE test')
num_of_rows = 1000
rows = []
for i in xrange(0, num_of_rows):
rows.append({'id': i, 'value': i})
print '--------- test insert --------------'
sql = '''
INSERT INTO test (id, value)
VALUES (:id, :value)
'''
start = time.time()
engine.execute(text(sql), rows)
end = time.time()
print 'Cost {} seconds'.format(end - start)
print '--------- test upsert --------------'
for r in rows:
r['value'] = r['id'] + 1
sql = '''
INSERT INTO test (id, value)
VALUES (:id, :value)
ON DUPLICATE KEY UPDATE value = VALUES(value)
'''
start = time.time()
engine.execute(text(sql), rows)
end = time.time()
print 'Cost {} seconds'.format(end - start)
print '--------- test update --------------'
for r in rows:
r['value'] = r['id'] * 10
sql = '''
UPDATE test
SET value = :value
WHERE id = :id
'''
start = time.time()
engine.execute(text(sql), rows)
end = time.time()
print 'Cost {} seconds'.format(end - start)
num_of_rows = 100时的输出:
--------- test insert --------------
Cost 0.568960905075 seconds
--------- test upsert --------------
Cost 0.569655895233 seconds
--------- test update --------------
Cost 20.0891299248 seconds
num_of_rows = 1000时的输出:
--------- test insert --------------
Cost 0.807548999786 seconds
--------- test upsert --------------
Cost 0.584554195404 seconds
--------- test update --------------
Cost 206.199367046 seconds
数据库服务器的网络延迟约为 500 毫秒。
看起来批量更新它发送并执行每个查询,而不是批量?
提前致谢。
即使数据库服务器(如您的情况)有非常糟糕的延迟,您也可以使用一个技巧来加快批量更新操作。您不是直接更新 table,而是使用 stage-table 非常快速地插入新数据,然后对 目的地-table。这还有一个好处,就是可以显着减少必须发送到数据库的语句数量。
这如何与更新一起使用?
假设你有一个 table entries
并且你一直有新数据进来,但你只想更新那些已经存储的数据。您创建了目的地副本-table entries_stage
,其中仅包含相关字段:
entries = Table('entries', metadata,
Column('id', Integer, autoincrement=True, primary_key=True),
Column('value', Unicode(64), nullable=False),
)
entries_stage = Table('entries_stage', metadata,
Column('id', Integer, autoincrement=False, unique=True),
Column('value', Unicode(64), nullable=False),
)
然后使用批量插入插入数据。如果你使用 MySQL 的多值插入语法,这可以进一步加速,SQLAlchemy 本身不支持这种语法,但可以毫不费力地构建。
INSERT INTO enries_stage (`id`, `value`)
VALUES
(1, 'string1'), (2, 'string2'), (3, 'string3'), ...;
最后,您使用阶段-table 中的值更新目标-table 的值,如下所示:
UPDATE entries e
JOIN entries_stage es ON e.id = es.id
SET e.value = es.value
WHERE e.value != es.value;
那就大功告成了。
插入呢?
这当然也可以加快插入速度。由于您已经在 stage-table 中拥有数据,您需要做的就是发出一个 INSERT INTO ... SELECT
语句,其中的数据不在 目的地-table还没有。
INSERT INTO entries (id, value)
SELECT FROM entries_stage es
LEFT JOIN entries e ON e.id = es.id
HAVING e.id IS NULL;
这样做的好处是您不必执行 INSERT IGNORE
、REPLACE
或 ON DUPLICATE KEY UPDATE
, 会增加您的主键,甚至如果他们什么都不做.