MySQL 在 Airflow 中使用会出现很多死锁

MySQL used in Airflow comes to a lot of deadlock

我现在在使用 Airflow 和 MySQL 作为元数据库时遇到很多死锁,innodb 死锁日志是:

LATEST DETECTED DEADLOCK

191113 18:05:59
    (1) TRANSACTION:
TRANSACTION 5BDBA005, ACTIVE 0 sec fetching rows
mysql tables in use 2, locked 2
LOCK WAIT 20 lock struct(s), heap size 3112, 89 row lock(s)
MySQL thread id 108349930, OS thread handle 0x7f9a3aa68700, query id 5378923356 10.13.33.22 root Sending data
UPDATE task_instance, dag_run SET task_instance.state=NULL WHERE task_instance.dag_id = 'test_dag_01' AND task_instance.state IN ('queued', 'scheduled') AND dag_run.dag_id = task_instance.dag_id AND dag_run.execution_date = task_instance.execution_date AND dag_run.state != 'running'
    (1) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 0 page no 503427 n bits 168 index `PRIMARY` of table `airflow`.`task_instance` trx id 5BDBA005 lock_mode X locks rec but not gap waiting
    (2) TRANSACTION:
TRANSACTION 5BDBA004, ACTIVE 0 sec updating or deleting
mysql tables in use 1, locked 1
3 lock struct(s), heap size 1248, 2 row lock(s), undo log entries 1
MySQL thread id 108349929, OS thread handle 0x7f9a38cec700, query id 5378923359 10.13.33.22 root Updating
UPDATE task_instance SET state='queued', queued_dttm='2019-11-13 18:05:59.392656' WHERE task_instance.dag_id = 'test_dag_01' AND task_instance.task_id = 'test_task_01_03' AND task_instance.execution_date = '2019-11-08 00:00:00'
    (2) HOLDS THE LOCK(S):
RECORD LOCKS space id 0 page no 503427 n bits 168 index `PRIMARY` of table `airflow`.`task_instance` trx id 5BDBA004 lock_mode X locks rec but not gap
    (2) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 0 page no 417617 n bits 360 index `ti_dag_state` of table `airflow`.`task_instance` trx id 5BDBA004 lock_mode X locks rec but not gap waiting
    WE ROLL BACK TRANSACTION (2)

TRANSACTIONS

Trx id counter 5BEC503C
Purge done for trx's n:o < 5BEC4F55 undo n:o < 0
History list length 2924
LIST OF TRANSACTIONS FOR EACH SESSION:
---TRANSACTION 5BEC5039, not started
MySQL thread id 108413843, OS thread handle 0x7f9a3a32e700, query id 5384428783 10.13.33.21 root
---TRANSACTION 5BEC503B, not started
MySQL thread id 108412780, OS thread handle 0x7f9a3a206700, query id 5384428790 10.13.33.20 root
---TRANSACTION 5BEC4D73, not started
MySQL thread id 108398996, OS thread handle 0x7f9a38b7a700, query id 5384425580 10.13.33.22 root
---TRANSACTION 5BE3DE1B, not started
MySQL thread id 108394511, OS thread handle 0x7f9a3af52700, query id 5381874736 10.13.33.20 root
---TRANSACTION 5BEC44D1, not started
MySQL thread id 108394257, OS thread handle 0x7f9a38d36700, query id 5384415004 10.13.33.20 root
---TRANSACTION 5BEC468B, not started
MySQL thread id 108394256, OS thread handle 0x7f9a3ad02700, query id 5384417058 10.13.33.20 root
---TRANSACTION 5BEC4696, not started
MySQL thread id 108394255, OS thread handle 0x7f9a3a4ea700, query id 5384417099 10.13.33.20 root
---TRANSACTION 5BEC3439, not started
MySQL thread id 108394254, OS thread handle 0x7f9a39754700, query id 5384394965 10.13.33.20 root
---TRANSACTION 5BE3C3B2, not started
MySQL thread id 108394242, OS thread handle 0x7f9a39426700, query id 5381837526 10.13.33.20 root
---TRANSACTION 5BEC4C50, not started
MySQL thread id 108394158, OS thread handle 0x7f9a3b07a700, query id 5384424099 10.13.33.20 root
---TRANSACTION 5BEC4E73, not started
MySQL thread id 108391224, OS thread handle 0x7f9a3aa68700, query id 5384426741 10.13.33.22 root
---TRANSACTION 5BEC3C6E, not started
MySQL thread id 108390046, OS thread handle 0x7f9a38a9c700, query id 5384404901 10.13.33.20 root
---TRANSACTION 5BEC3AD7, not started
MySQL thread id 108390031, OS thread handle 0x7f9a39bf4700, query id 5384403015 10.13.33.20 root
---TRANSACTION 5BE07D02, not started
MySQL thread id 108336566, OS thread handle 0x7f9a3a1bc700, query id 5384428791 10.13.33.20 root
show engine innodb status

END OF INNODB MONITOR OUTPUT

2个相关的table是task_instance和dag_run,task_instance是一个table用来保存任务运行记录,dag_run 保留 dag 记录:

CREATE TABLE `task_instance` (
  `task_id` varchar(250) NOT NULL,
  `dag_id` varchar(250) NOT NULL,
  `execution_date` datetime(6) NOT NULL,
  `start_date` datetime(6) DEFAULT NULL,
  `end_date` datetime(6) DEFAULT NULL,
  `duration` float DEFAULT NULL,
  `state` varchar(20) DEFAULT NULL,
  `try_number` int(11) DEFAULT NULL,
  `hostname` varchar(1000) DEFAULT NULL,
  `unixname` varchar(1000) DEFAULT NULL,
  `job_id` int(11) DEFAULT NULL,
  `pool` varchar(50) DEFAULT NULL,
  `queue` varchar(50) DEFAULT NULL,
  `priority_weight` int(11) DEFAULT NULL,
  `operator` varchar(1000) DEFAULT NULL,
  `queued_dttm` datetime(6) DEFAULT NULL,
  `pid` int(11) DEFAULT NULL,
  `feedback` text,
  `run_type` tinyint(1) DEFAULT NULL,
  PRIMARY KEY (`task_id`,`dag_id`,`execution_date`),
  KEY `ti_dag_state` (`dag_id`,`state`),
  KEY `ti_pool` (`pool`,`state`,`priority_weight`),
  KEY `ti_state_lkp` (`dag_id`,`task_id`,`execution_date`,`state`),
  KEY `ti_state` (`state`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 


CREATE TABLE `dag_run` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `dag_id` varchar(250) DEFAULT NULL,
  `execution_date` datetime(6) DEFAULT NULL,
  `state` varchar(50) DEFAULT NULL,
  `run_id` varchar(250) DEFAULT NULL,
  `external_trigger` tinyint(1) DEFAULT NULL,
  `conf` blob,
  `end_date` datetime(6) DEFAULT NULL,
  `start_date` datetime(6) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `dag_id` (`dag_id`,`execution_date`),
  UNIQUE KEY `dag_id_2` (`dag_id`,`run_id`),
  KEY `dag_id_state` (`dag_id`,`state`)
) ENGINE=InnoDB AUTO_INCREMENT=53394 DEFAULT CHARSET=utf8

而且当Airflow的DAG越来越多的时候,死锁越来越多,导致Airflow scheduler大量retres,请问有什么办法可以减少死锁吗?

当 运行 这个 sql:

explain select * from task_instance WHERE task_instance.dag_id = 'test_dag_01' AND task_instance.task_id = 'test_task_01_03' AND task_instance.execution_date = '2019-11-08 00:00:00';

它returns:

+------+-------------+---------------+-------+-----------------------------------+---------+---------+-------------------+------+-------+
| id   | select_type | table         | type  | possible_keys                     | key     | key_len | ref               | rows | Extra |
+------+-------------+---------------+-------+-----------------------------------+---------+---------+-------------------+------+-------+
|    1 | SIMPLE      | task_instance | const | PRIMARY,ti_dag_state,ti_state_lkp | PRIMARY | 1512    | const,const,const |    1 |       |
+------+-------------+---------------+-------+-----------------------------------+---------+---------+-------------------+------+-------+

加快查询速度 是一种减少死锁频率的方法。我看到了您需要的另一个索引:

task_instance:  INDEX(dag_id, state)  -- got it
task_instance:  INDEX(dag_id, execution_date, state)  -- need
task_instance:  INDEX(dag_id, task_id, execution_date)  -- got it
dag_run:  INDEX(dag_id, execution_date)  -- got it

几乎所有列都可以为空。这似乎不现实。

您在 dag_run 上有 3 个唯一键。如果你不需要 id 如果你可以使 dag_id NOT NULL 和其他列之一 NOT NULL,那么将其提升为 PRIMARY KEY.

你真的需要这么多吗? varchar(250) -- 如果不是,将它们缩小到合理的大小。这将缩小表(尤其是索引),从而有所帮助。

拥有更好的索引还有助于避免不必要地锁定比需要更多的行。

交易中的其他查询。每笔交易中还有什么?也许 SELECT ... FOR UPDATE 锁定了我们从提供的输出中看不到的东西?

相同行被不同的事务以不同的顺序锁定时,可能会发生死锁。

Normalize -- 另一种缩小表的方法是用整数替换字符串。考虑主机名、unixname、池、队列、状态、运算符。