MySQL 在多个表上批量插入

MySQL bulk insert on multiple tables

我有一个 MySQL 数据库,其中包含 2 个表 productsproduct_variants。一个产品有很多产品变体。这里有一个示例:

products
+----+------+
| id | name |
+----+------+
|  1 | Foo  |
|  2 | Bar  |
+----+------+

product_variants
+----+-------------+--------+
| id | product_id  | value  |
+----+-------------+--------+
| 10 |           1 | red    |
| 11 |           1 | green  |
| 12 |           1 | blue   |
| 13 |           2 | red    |
| 14 |           2 | yellow |
+----+-------------+--------+

现在我需要以最有效和最快的方式批量插入大量产品及其变体。我有一个 JSON 有很多这样的产品 (100k+):

[
  {
    "name": "Foo",
    "variants": [{ "value": "red" }, { "value": "green" }, { "value": "blue" }]
  },
  {
    "name": "Bar",
    "variants": [{ "value": "red" }, { "value": "yellow" }]
  },
  ...
]

我应该从中生成查询以插入产品。

我的想法是使用这样的 insert 查询:

INSERT INTO `products` (name) VALUES ("foo"), ("bar"), ...;

但是我不知道在 product_variants:

的插入查询中使用什么 product_id(外键)
INSERT INTO `product_variants` (product_id,value) VALUES (?,"red"), (?,"green"), ...;

(事务内的这些查询)

我想过手动指定产品 ID,从最后一个 ID 开始递增,但是当并发连接同时插入产品或 2 个或更多批量插入进程时我会出错 运行同时

我可以使用什么策略来实现我的目标?有没有标准的方法来做到这一点?

ps:如果可能的话我不想改变这两个表的结构。

您可以使用 last_insert_id() 从最后一条语句中获取最后生成的 ID。但是,如前所述,由于此仅获取语句的最后一个 ID,因此需要您单独处理每个产品。不过,您可以批量插入变体。但是从给定 JSON 的结构来看,我认为这使得遍历 JSON 变得更加容易。每个产品及其变体都应插入交易中,这样如果 INSERT 到产品 table 由于某种原因失败,则产品的变体不会添加到之前的产品中。

START TRANSACTION;
INSERT INTO products
            (name)
            VALUES ('Foo');
INSERT INTO product_variants
            (product_id,
             value)
            VALUES (last_insert_id(),
                    'red'),
                   (last_insert_id(),
                    'green'),
                   (last_insert_id(),
                    'blue');
COMMIT;

START TRANSACTION;
INSERT INTO products
            (name)
            VALUES ('Bar');
INSERT INTO product_variants
            (product_id,
             value)
            VALUES (last_insert_id(),
                    'red'),
                   (last_insert_id(),
                    'yellow');
COMMIT;

db<>fiddle

如果您在 table 中已经有了 JSON,那么它可能可以通过两个语句(非常有效地)完成:

INSERT INTO Products (name)
    SELECT name
        FROM origial_table;  -- to get the product names

INSERT INTO Variants (product_id, `value`)
    SELECT  ( SELECT id FROM Products WHERE name = ot.name ),
            `value`
        FROM origial_table AS ot;

实际上,namevalue 需要是 suitable JSON 表达式才能提取值。

如果您担心第一个 table 中有很多重复的 "products",请确保 UNIQUE(name)。您可以通过此处描述的两步过程避免 "burning" id:mysql.rjweb.org/doc.php/staging_table#normalization

最后,我使用了一种使用 MySQL 函数 LAST_INSERT_ID() 的策略,就像 @sticky-bit sad 但使用批量插入(许多产品需要 1 个插入)要快得多。

我附加了一个简单的 Ruby 脚本来执行批量插入。一切似乎也适用于并发插入。

我已经 运行 带有标志 innodb_autoinc_lock_mode = 2 的脚本,一切看起来都不错,但我不知道是否有必要将标志设置为 1:

require 'active_record'
require 'benchmark'
require 'mysql2'
require 'securerandom'

ActiveRecord::Base.establish_connection(
  adapter:  'mysql2',
  host:     'localhost',
  username: 'root',
  database: 'test',
  pool:     200
)

class ApplicationRecord < ActiveRecord::Base
  self.abstract_class = true
end

class Product < ApplicationRecord
  has_many :product_variants
end

class ProductVariant < ApplicationRecord
  belongs_to :product
  COLORS = %w[red blue green yellow pink orange].freeze
end

def migrate
  ActiveRecord::Schema.define do
    create_table(:products) do |t|
      t.string :name
    end

    create_table(:product_variants) do |t|
      t.references :product, null: false, foreign_key: true
      t.string :color
    end
  end
end

def generate_data
  d = []
  100_000.times do
    d << {
      name: SecureRandom.alphanumeric(8),
      product_variants: Array.new(rand(1..3)).map do
        { color: ProductVariant::COLORS.sample }
      end
    }
  end
  d
end

DATA = generate_data.freeze

def bulk_insert
  # All inside a transaction
  ActiveRecord::Base.transaction do
    # Insert products
    values = DATA.map { |row| "('#{row[:name]}')" }.join(',')
    q = "INSERT INTO products (name) VALUES #{values}"
    ActiveRecord::Base.connection.execute(q)

    # Get last insert id
    q = 'SELECT LAST_INSERT_ID()'
    last_id, = ActiveRecord::Base.connection.execute(q).first

    # Insert product variants
    i = -1
    values = DATA.map do |row|
      i += 1
      row[:product_variants].map { |subrow| "(#{last_id + i},'#{subrow[:color]}')" }
    end.flatten.join(',')
    q = "INSERT INTO product_variants (product_id,color) VALUES #{values}"
    ActiveRecord::Base.connection.execute(q)
  end
end

migrate

threads = []

# Spawn 100 threads that perform 200 single inserts each
100.times do
  threads << Thread.new do
    200.times do
      Product.create(name: 'CONCURRENCY NOISE')
    end
  end
end

threads << Thread.new do
  Benchmark.bm do |benchmark|
    benchmark.report('Bulk') do
      bulk_insert
    end
  end
end

threads.map(&:join)

在 运行 脚本之后,我检查了所有产品都具有与查询关联的变体

SELECT * 
FROM products
 LEFT OUTER JOIN product_variants
 ON (products.id = product_variants.product_id)
WHERE product_variants.product_id IS NULL
AND name != "CONCURRENCY NOISE";

正确的是我没有得到任何行。