事务与批量查询以避免重复 MySQL 插入

Transaction vs Batch Query to Avoid Duplicate MySQL Inserts

我有一个 PHP 脚本 (deleteAndReInsert.php),它删除所有 name = 'Bob' 的行,然后插入 1000 个 name = 'Bob' 的新行。这工作正常,并且最初为空的 table 最终如预期的那样总共有 1000 行。

$query = $pdo->prepare("DELETE FROM table WHERE name=?");
$query->execute(['Bob']);

$query = $pdo->prepare("INSERT INTO table (name, age) VALUES (?,?)");
for ($i = 0; $i < 1000; $i++)
{
    $query->execute([ 'name' => 'Bob', 'age' => 34 ]);
}

问题是如果我 运行 deleteAndReInsert.php 两次(几乎在同一时间),最后的 table 包含超过 1000 行。

似乎正在发生的事情是第一个 运行 的 DELETE 查询完成,然后许多(但不是全部)1000 INSERTS 被调用。

然后第二个 DELETE 查询开始并在前 1000 个 INSERTS 完成之前完成(比如 1000 个 INSERTS 中的 350 个完成)。现在第二个 1000 INSERTS 运行s,我们最终得到 1650 行而不是 1000 行,因为在第二个 DELETE 被调用后仍然有 1000 - 350 = 650 INSERTS 剩余。

防止此问题发生的正确方法是什么?我应该将所有内容都包装在一个事务中,还是应该进行 1 次批量插入调用而不是 1000 次单独插入?显然我可以实施这两种解决方案,但我很好奇哪一个可以保证避免这个问题。

你必须锁定操作并且在插入结束之前不要释放它。

您可以使用文件系统上的文件,但正如@chris Hass 建议的那样,您可以像这样使用 symfony 的包:

安装 symfony 锁:

composer require symfony/lock

你应该包括作曲家的自动加载

require __DIR__.'/vendor/autoload.php';

然后在你的 deleteAndReInsert.php 中:

use Symfony\Component\Lock\LockFactory;
use Symfony\Component\Lock\Store\SemaphoreStore;

//if you are on windows or for any reason this store(FlockStore) didnt work
// you can use another stores available here: https://symfony.com/doc/current/components/lock.html#available-stores 
$store = new FlockStore();
$factory = new LockFactory($store);
$lock = $factory->createLock('bob-recreation');
$lock->acquire(true)

$query = $pdo->prepare("DELETE FROM table WHERE name=?");
$query->execute(['Bob']);

$query = $pdo->prepare("INSERT INTO table (name, age) VALUES (?,?)");
for ($i = 0; $i < 1000; $i++)
{
    $query->execute([ 'name' => 'Bob', 'age' => 34 ]);
}
$lock->release();

发生了什么

正如您提到的那样,发生的事情是 race condition:

If Two Concurrent process are accessing a Shared Resource, That resembles the Critical Section Which maybe needs to get protected with locks

使用事务+批量插入

我认为解决问题的正确方法是使用事务。我们要做一个删除+批量插入,代码如下:

$pdo->beginTransaction();
$query = $pdo->prepare("DELETE FROM table WHERE name=?");
$query->execute(['Bob']);

$sql = "INSERT INTO table (name, age) VALUES ".implode(', ',array_fill(0,999, '(:name, :age)'));
$query = $sth->prepare($sql);
$query->execute(array([ ':name' => 'Bob', 'age' => 34 ]));

$pdo->commit();

仅使用批量插入(行不通)

为什么只做批量插入不能解决问题?想象一下以下场景:

  1. 第一个脚本进行删除并移除前 1000 行。 ==> 表格 1000 行到 0.
  2. 第二个脚本尝试删除但没有行。 ==> 表格 0 行到 0.
  3. 第一个(或第二个)脚本进行 1000 次批量插入。 ==> 表格 1000 行到 1000.
  4. 第二个(或第一个)脚本进行第二个 1000 批量插入。 ==> 表格 1000 行到 2000.

这就是进程异步的原因,因此第二个脚本可能会在第一个脚本完成插入之前读取 table。

使用辅助 table 模拟锁(不推荐)

如果我们没有交易,我们将如何解决这个问题?我认为这是一个交叉练习。

这是一个经典的并发问题,其中有两个或多个进程修改相同的数据。为了解决这个问题,我建议你使用第二个辅助 table 来模拟一个 Lock 并控制对主 table.

的并发访问。
CREATE TABLE `access_table` (
  `access` TINYINT(1) NOT NULL DEFAULT 1
)

并且在脚本中

// Here we control the concurrency
do{
   $query = $st->prepare('UPDATE access_table SET access = 0 WHERE access = 1');
   $query ->execute();
   $count = $query ->rowCount();
   // You should put here a random sleep
}while($count === 0);


//Here we know that only us we are modifying the table
$query = $pdo->prepare("DELETE FROM table WHERE name=?");
$query->execute(['Bob']);

$query = $pdo->prepare("INSERT INTO table (name, age) VALUES (?,?)");
for ($i = 0; $i < 1000; $i++)
{
    $query->execute([ 'name' => 'Bob', 'age' => 34 ]);
}


//And finally we open the table for other process
$query = $st->prepare('UPDATE access_table SET access = 1 WHERE access = 0');
$query ->execute();

您可以根据您的问题调整 table,例如,如果 INSERTS/DELETES 是名称,您可以使用 varchar(XX) 作为名称。

CREATE TABLE `access_table` (
  `name` VARCHAR(50) NOT NULL,
  `access` TINYINT(1) NOT NULL DEFAULT 1
)

有了这个场景

  1. 第一个脚本将访问值更改为 0。
  2. 第二个脚本无法更改值,因此它处于循环中
  3. 第一个脚本使 DELETES/INSERTS
  4. 第一个脚本将状态更改为 1
  5. 第二个脚本将访问值更改为 0 并破坏了外观。
  6. 第二个脚本使 DELETES/INSERTS
  7. 第二个脚本将状态更改为 1

这是因为更新是原子的,这意味着两个进程不能同时更新同一个日期,所以当第一个脚本更新值时,第二个脚本不能修改,那个动作是原子的。

希望对你有所帮助。

计数为近似值

SHOW TABLE STATUS(以及此类的许多变体)仅提供行数的估计值。 (请说明您是如何获得“1650”的。)

准确的计算方式是

SELECT COUNT(*) FROM table;

进一步讨论

“事务锁定”主要有两种方式。两者都防止 其他 连接干扰。

  • 自动提交:

    SET autocommit = ON;   -- probably this is the default
    -- Now each SQL statement is a separate "transaction"
    
  • 开始...提交

    BEGIN;  -- (this is performed in a variety of ways by the db layer)
    delete...
    insert...
    COMMIT;  --everything above either entire happens or is entirely ROLLBACK'd
    

性能:

  • DELETE --> TRUNCATE
  • 批量插入(单次插入 1000 行 INSERT
  • BEGIN...COMMIT
  • LOAD DATA 而不是 INSERT

但是none的表现技巧会改变你遇到的问题——除了“巧合”。


为什么是 1650?

(或其他一些数字)InnoDB 的事务性质要求它挂在正在删除或插入的行的先前副本上,直到 COMMIT(无论是显式还是自动提交)。这会使数据库充满可能消失的“行”。因此,任何试图推算确切行数的尝试都是不切实际的。

这导致使用不同的技术来估计行数。它是这样的: table 占用了这么多磁盘,我们估计平均行有这么多字节。将它们相除以获得行数。

这导致您关于删除未完成的理论。就任何 SQL 而言,删除 完成。但是临时保存的1000行副本还没有彻底清理掉table。因此,行数计算不精确。


锁定?

锁定技术将“修复”1650。如果您不想要其他线程 inserting/deleting 行而 运行 您的删除,则需要锁定+插入实验。您应该为此目的使用锁定。

同时,如果您想要精确计数,您必须使用COUNT(*)

What's the correct way to prevent this problem from happening?

这不是问题,并且是两个页面访问数据库中相同 table 的预期行为。

Should I wrap everything in a transaction, or should I make 1 batch insert call instead of 1000 individual inserts? Obviously I can implement both of these solutions, but I'm curious as to which one is guaranteed to prevent this problem.

除了根据您运行.

的页数将插入量限制为 n000 之外,不会有任何差别。

场景 1 - 什么都不做

有两页运行一个接一个或在相似的时间。这就是为什么您看到 1650 条记录是由于 execute 方法中的隐式事务,允许其他进程(在您的情况下是页面)访问 table.

中的数据
Action Page a Page b Table Row count
1 Deletes all Bobs 0
... Insert a row 1
351 Insert a row Deletes all Bobs 0
352 Insert a row Insert a row 2
... Insert a row Insert a row 4
1001 Insert a row Insert a row 1298
1002 Insert a row 1299
... Insert a row ...
1352 Insert a row 1650

因此插入了 1650 个 Bob。


场景 2 - 使用显式事务(乐观) |行动 |页一|页b | Table 行数 |交易 | | ----- | ------ | ------ | --- | --- | | 1 |开始| | 0 | | | 2 |删除所有鲍勃 |开始| 0 | (a-d0)| | 3 |插入 1000 行 |删除所有鲍勃 | 0| (a-d0-i1000)(b-d1000) | | 4 |提交 |插入 1000 行 | 1000 | (b-d1000-i1000) | | 5 | |提交 | 1000 |


场景 3 - 添加锁定 |行动 |页一|页b | Table 行数 | | ----- | ------ | ------ | --- | | 1 | AQ锁| | 0 | | 2 |开始| | 0 | | 3 |删除所有鲍勃 | AQ锁| 0 | | 4 |插入 1000 行 |没有锁| 0| | 5 |提交 |没有锁 | 1000 |
| 6 | | AQ锁| 1000 | | 6 | |开始| 1000 | | 6 | |删除所有鲍勃 | 1000 (0) | | 6 | |插入 1000 行 | 1000 (1000) | | 6 | |解锁 | 1000 |

其他解决方案的替代方法是在脚本启动时创建一个实际的锁定文件,并检查它是否在 运行 之前存在。

while( file_exists("isrunning.lock") ){
    sleep(1);
}

//create file isrunning.lock
$myfile = fopen("isrunning.lock", "w");

//deleteAndinsert code

//delete lock file when finished
fclose($myfile);
unlink("isrunning.lock");

如果有另一个实例,您可以检查服务器上的进程列表并阻止您的脚本执行。

你打了两次deleteAndReInsert.php,每个脚本有1001条命令,首先是删除所有name = Bob,剩下的是再次插入1000次Bob。 所以你总共有 2002 个命令,你没有声明让 Mysql 明白你想同步执行它的东西,你的 2002 个命令将 运行 并发,并会导致意想不到的结果. (插入了 1000 多个 name= Bob)。这个过程可以这样描述:

->delete `name= bob` (clear count = 0)
->insert `name = bob`
->insert `name = bob`
->insert `name = bob`
->insert `name = bob`
....
->insert `name = bob`
->delete `name= bob` (the second time deleteAndReInsert.php 
hit deleted at 300 times insert `name = bob` of first
 time deleteAndReInsert.php -> clear count rows = 0)
->insert `name = bob`
->insert `name = bob`
->insert `name = bob`
....
-> insert `name = bob` (now it could be more than 1000 rows)

所以如果你想要结果是 1000 行。你必须让 mysql 明白:我希望 deleteAndReInsert.php 和 运行 同步,一步一步。并存档您可以执行以下解决方案之一:

  1. 使用 LOCK TABLE 语句在完成时锁定 table 和 UNLOCK,这使得第二个脚本不能对 table 做任何事情,除非第一个脚本完成。
  2. 将所有事务包装在事务 BEGIN COMMIT 中,然后 mysql 将 运行 作为原子操作。 (好)
  3. 通过 redis (Redlock)、文件 .. 模拟 LOCK 使您的操作 运行 同步(良好)

希望能帮到您解决问题。

您要做的是发布 LOCK TABLE ... WRITE 作为您工作的第一条声明,并发布 RELEASE TABLES 作为最后一条。

然后这千行会被删除,然后插入,然后删除,然后再插入。

但整个过程对我来说就像一个 XY 问题。您真正需要做什么?

因为我经常需要做你描述的这样的事情(例如“刷新”一些摘要),在那种情况下,我认为最好的方法是,既没有 LOCK 也没有 DELETE/INSERT,而是

INSERT INTO table
    ON DUPLICATE KEY UPDATE ...

在我的例子中,如果我只需要添加刷新记录,就足够了。

否则,我通常会添加一个“时间”字段,使我能够识别刷新周期中“遗漏”的所有记录;那些 - 并且只有那些 - 在完成后被删除。

例如,我需要通过复杂的PHP计算,计算许多客户的最大财务风险,然后插入到table中以便于使用。每个客户每天晚上都会刷新其值,然后在第二天使用“缓存”table。截断 table 并重新插入所有内容很痛苦。

相反,我计算所有值并构建一个非常大的多 INSERT 查询(如果需要,我可以将其拆分为 X 个较小的多查询):

SELECT barrier:=NOW();
INSERT INTO `financial_exposures` ( ..., amount, customer_id, last_update )
    VALUES 
    ( ..., 172035.12, 12345, NOW()),
    ( ..., 123456.78, 12346, NOW()),
    ...
    ( ..., 450111.00, 99999, NOW())
     ON DUPLICATE KEY UPDATE 
         amount=VALUES(amount),
         last_update=VALUES(last_update);
 DELETE FROM financial_exposures WHERE last_update < @barrier;

插入新客户,更新旧客户,除非他们的值不变(在这种情况下 MySQL 跳过更新,节省时间),并且在每一刻,总是存在一条记录 - 之前的一条更新,或更新后的那个。删除的客户作为最后一步被删除。

当您有需要经常使用 更新的 table 时,此方法效果更好。您可以添加一个没有锁定的事务(SET autocommit = 0INSERT 之前,COMMIT WORKDELETE 之后)以确保所有客户端都能看到整个更新,就好像它是即时发生的一样。

@Pericodes 的回答是正确的,但是代码片段有错误。

您可以通过将代码包装在事务中来避免重复(批量插入不需要停止重复)。

最好使用 1 个批量插入而不是 1000 个单独的插入,即使这不是必需的。

您可以通过 运行 这段代码测试两次(几乎同时),并且 table 最终恰好有 1000 条记录。

<?
$pdo->beginTransaction();

$query = $pdo->prepare("DELETE FROM t1 WHERE name=?");
$query->execute(['Bob']);

$query = $pdo->prepare("INSERT INTO t1 (name, age) VALUES (:name,:age)");
for ($i = 0; $i < 100; $i++)
{
    $query->execute([ 'name' => 'Bob', 'age' => 34 ]);
}

$pdo->commit();

有几个答案提到了锁(数据库级和代码级),但这些对于这个问题来说不是必需的,而且在我看来是多余的。