MySQL 在多个表上批量插入
MySQL bulk insert on multiple tables
我有一个 MySQL 数据库,其中包含 2 个表 products
和 product_variants
。一个产品有很多产品变体。这里有一个示例:
products
+----+------+
| id | name |
+----+------+
| 1 | Foo |
| 2 | Bar |
+----+------+
product_variants
+----+-------------+--------+
| id | product_id | value |
+----+-------------+--------+
| 10 | 1 | red |
| 11 | 1 | green |
| 12 | 1 | blue |
| 13 | 2 | red |
| 14 | 2 | yellow |
+----+-------------+--------+
现在我需要以最有效和最快的方式批量插入大量产品及其变体。我有一个 JSON 有很多这样的产品 (100k+):
[
{
"name": "Foo",
"variants": [{ "value": "red" }, { "value": "green" }, { "value": "blue" }]
},
{
"name": "Bar",
"variants": [{ "value": "red" }, { "value": "yellow" }]
},
...
]
我应该从中生成查询以插入产品。
我的想法是使用这样的 insert
查询:
INSERT INTO `products` (name) VALUES ("foo"), ("bar"), ...;
但是我不知道在 product_variants
:
的插入查询中使用什么 product_id
(外键)
INSERT INTO `product_variants` (product_id,value) VALUES (?,"red"), (?,"green"), ...;
(事务内的这些查询)
我想过手动指定产品 ID,从最后一个 ID 开始递增,但是当并发连接同时插入产品或 2 个或更多批量插入进程时我会出错 运行同时
我可以使用什么策略来实现我的目标?有没有标准的方法来做到这一点?
ps:如果可能的话我不想改变这两个表的结构。
您可以使用 last_insert_id()
从最后一条语句中获取最后生成的 ID。但是,如前所述,由于此仅获取语句的最后一个 ID,因此需要您单独处理每个产品。不过,您可以批量插入变体。但是从给定 JSON 的结构来看,我认为这使得遍历 JSON 变得更加容易。每个产品及其变体都应插入交易中,这样如果 INSERT
到产品 table 由于某种原因失败,则产品的变体不会添加到之前的产品中。
START TRANSACTION;
INSERT INTO products
(name)
VALUES ('Foo');
INSERT INTO product_variants
(product_id,
value)
VALUES (last_insert_id(),
'red'),
(last_insert_id(),
'green'),
(last_insert_id(),
'blue');
COMMIT;
START TRANSACTION;
INSERT INTO products
(name)
VALUES ('Bar');
INSERT INTO product_variants
(product_id,
value)
VALUES (last_insert_id(),
'red'),
(last_insert_id(),
'yellow');
COMMIT;
如果您在 table 中已经有了 JSON,那么它可能可以通过两个语句(非常有效地)完成:
INSERT INTO Products (name)
SELECT name
FROM origial_table; -- to get the product names
INSERT INTO Variants (product_id, `value`)
SELECT ( SELECT id FROM Products WHERE name = ot.name ),
`value`
FROM origial_table AS ot;
实际上,name
和 value
需要是 suitable JSON 表达式才能提取值。
如果您担心第一个 table 中有很多重复的 "products",请确保 UNIQUE(name)
。您可以通过此处描述的两步过程避免 "burning" id:mysql.rjweb.org/doc.php/staging_table#normalization
最后,我使用了一种使用 MySQL 函数 LAST_INSERT_ID()
的策略,就像 @sticky-bit sad 但使用批量插入(许多产品需要 1 个插入)要快得多。
我附加了一个简单的 Ruby 脚本来执行批量插入。一切似乎也适用于并发插入。
我已经 运行 带有标志 innodb_autoinc_lock_mode = 2
的脚本,一切看起来都不错,但我不知道是否有必要将标志设置为 1:
require 'active_record'
require 'benchmark'
require 'mysql2'
require 'securerandom'
ActiveRecord::Base.establish_connection(
adapter: 'mysql2',
host: 'localhost',
username: 'root',
database: 'test',
pool: 200
)
class ApplicationRecord < ActiveRecord::Base
self.abstract_class = true
end
class Product < ApplicationRecord
has_many :product_variants
end
class ProductVariant < ApplicationRecord
belongs_to :product
COLORS = %w[red blue green yellow pink orange].freeze
end
def migrate
ActiveRecord::Schema.define do
create_table(:products) do |t|
t.string :name
end
create_table(:product_variants) do |t|
t.references :product, null: false, foreign_key: true
t.string :color
end
end
end
def generate_data
d = []
100_000.times do
d << {
name: SecureRandom.alphanumeric(8),
product_variants: Array.new(rand(1..3)).map do
{ color: ProductVariant::COLORS.sample }
end
}
end
d
end
DATA = generate_data.freeze
def bulk_insert
# All inside a transaction
ActiveRecord::Base.transaction do
# Insert products
values = DATA.map { |row| "('#{row[:name]}')" }.join(',')
q = "INSERT INTO products (name) VALUES #{values}"
ActiveRecord::Base.connection.execute(q)
# Get last insert id
q = 'SELECT LAST_INSERT_ID()'
last_id, = ActiveRecord::Base.connection.execute(q).first
# Insert product variants
i = -1
values = DATA.map do |row|
i += 1
row[:product_variants].map { |subrow| "(#{last_id + i},'#{subrow[:color]}')" }
end.flatten.join(',')
q = "INSERT INTO product_variants (product_id,color) VALUES #{values}"
ActiveRecord::Base.connection.execute(q)
end
end
migrate
threads = []
# Spawn 100 threads that perform 200 single inserts each
100.times do
threads << Thread.new do
200.times do
Product.create(name: 'CONCURRENCY NOISE')
end
end
end
threads << Thread.new do
Benchmark.bm do |benchmark|
benchmark.report('Bulk') do
bulk_insert
end
end
end
threads.map(&:join)
在 运行 脚本之后,我检查了所有产品都具有与查询关联的变体
SELECT *
FROM products
LEFT OUTER JOIN product_variants
ON (products.id = product_variants.product_id)
WHERE product_variants.product_id IS NULL
AND name != "CONCURRENCY NOISE";
正确的是我没有得到任何行。
我有一个 MySQL 数据库,其中包含 2 个表 products
和 product_variants
。一个产品有很多产品变体。这里有一个示例:
products
+----+------+
| id | name |
+----+------+
| 1 | Foo |
| 2 | Bar |
+----+------+
product_variants
+----+-------------+--------+
| id | product_id | value |
+----+-------------+--------+
| 10 | 1 | red |
| 11 | 1 | green |
| 12 | 1 | blue |
| 13 | 2 | red |
| 14 | 2 | yellow |
+----+-------------+--------+
现在我需要以最有效和最快的方式批量插入大量产品及其变体。我有一个 JSON 有很多这样的产品 (100k+):
[
{
"name": "Foo",
"variants": [{ "value": "red" }, { "value": "green" }, { "value": "blue" }]
},
{
"name": "Bar",
"variants": [{ "value": "red" }, { "value": "yellow" }]
},
...
]
我应该从中生成查询以插入产品。
我的想法是使用这样的 insert
查询:
INSERT INTO `products` (name) VALUES ("foo"), ("bar"), ...;
但是我不知道在 product_variants
:
product_id
(外键)
INSERT INTO `product_variants` (product_id,value) VALUES (?,"red"), (?,"green"), ...;
(事务内的这些查询)
我想过手动指定产品 ID,从最后一个 ID 开始递增,但是当并发连接同时插入产品或 2 个或更多批量插入进程时我会出错 运行同时
我可以使用什么策略来实现我的目标?有没有标准的方法来做到这一点?
ps:如果可能的话我不想改变这两个表的结构。
您可以使用 last_insert_id()
从最后一条语句中获取最后生成的 ID。但是,如前所述,由于此仅获取语句的最后一个 ID,因此需要您单独处理每个产品。不过,您可以批量插入变体。但是从给定 JSON 的结构来看,我认为这使得遍历 JSON 变得更加容易。每个产品及其变体都应插入交易中,这样如果 INSERT
到产品 table 由于某种原因失败,则产品的变体不会添加到之前的产品中。
START TRANSACTION;
INSERT INTO products
(name)
VALUES ('Foo');
INSERT INTO product_variants
(product_id,
value)
VALUES (last_insert_id(),
'red'),
(last_insert_id(),
'green'),
(last_insert_id(),
'blue');
COMMIT;
START TRANSACTION;
INSERT INTO products
(name)
VALUES ('Bar');
INSERT INTO product_variants
(product_id,
value)
VALUES (last_insert_id(),
'red'),
(last_insert_id(),
'yellow');
COMMIT;
如果您在 table 中已经有了 JSON,那么它可能可以通过两个语句(非常有效地)完成:
INSERT INTO Products (name)
SELECT name
FROM origial_table; -- to get the product names
INSERT INTO Variants (product_id, `value`)
SELECT ( SELECT id FROM Products WHERE name = ot.name ),
`value`
FROM origial_table AS ot;
实际上,name
和 value
需要是 suitable JSON 表达式才能提取值。
如果您担心第一个 table 中有很多重复的 "products",请确保 UNIQUE(name)
。您可以通过此处描述的两步过程避免 "burning" id:mysql.rjweb.org/doc.php/staging_table#normalization
最后,我使用了一种使用 MySQL 函数 LAST_INSERT_ID()
的策略,就像 @sticky-bit sad 但使用批量插入(许多产品需要 1 个插入)要快得多。
我附加了一个简单的 Ruby 脚本来执行批量插入。一切似乎也适用于并发插入。
我已经 运行 带有标志 innodb_autoinc_lock_mode = 2
的脚本,一切看起来都不错,但我不知道是否有必要将标志设置为 1:
require 'active_record'
require 'benchmark'
require 'mysql2'
require 'securerandom'
ActiveRecord::Base.establish_connection(
adapter: 'mysql2',
host: 'localhost',
username: 'root',
database: 'test',
pool: 200
)
class ApplicationRecord < ActiveRecord::Base
self.abstract_class = true
end
class Product < ApplicationRecord
has_many :product_variants
end
class ProductVariant < ApplicationRecord
belongs_to :product
COLORS = %w[red blue green yellow pink orange].freeze
end
def migrate
ActiveRecord::Schema.define do
create_table(:products) do |t|
t.string :name
end
create_table(:product_variants) do |t|
t.references :product, null: false, foreign_key: true
t.string :color
end
end
end
def generate_data
d = []
100_000.times do
d << {
name: SecureRandom.alphanumeric(8),
product_variants: Array.new(rand(1..3)).map do
{ color: ProductVariant::COLORS.sample }
end
}
end
d
end
DATA = generate_data.freeze
def bulk_insert
# All inside a transaction
ActiveRecord::Base.transaction do
# Insert products
values = DATA.map { |row| "('#{row[:name]}')" }.join(',')
q = "INSERT INTO products (name) VALUES #{values}"
ActiveRecord::Base.connection.execute(q)
# Get last insert id
q = 'SELECT LAST_INSERT_ID()'
last_id, = ActiveRecord::Base.connection.execute(q).first
# Insert product variants
i = -1
values = DATA.map do |row|
i += 1
row[:product_variants].map { |subrow| "(#{last_id + i},'#{subrow[:color]}')" }
end.flatten.join(',')
q = "INSERT INTO product_variants (product_id,color) VALUES #{values}"
ActiveRecord::Base.connection.execute(q)
end
end
migrate
threads = []
# Spawn 100 threads that perform 200 single inserts each
100.times do
threads << Thread.new do
200.times do
Product.create(name: 'CONCURRENCY NOISE')
end
end
end
threads << Thread.new do
Benchmark.bm do |benchmark|
benchmark.report('Bulk') do
bulk_insert
end
end
end
threads.map(&:join)
在 运行 脚本之后,我检查了所有产品都具有与查询关联的变体
SELECT *
FROM products
LEFT OUTER JOIN product_variants
ON (products.id = product_variants.product_id)
WHERE product_variants.product_id IS NULL
AND name != "CONCURRENCY NOISE";
正确的是我没有得到任何行。