Python 要更新的脚本 MySQL 报告受影响的行过多

Python Script to Update MySQL Reporting Too Many Rows Affected

早上好,

我目前正在使用 Python 编写对 MySQL 的更新脚本,这将从一个 table 中提取一个值并使用该值更新另一个 table。更新是分批完成的,这是可配置的,但目前以每次更新 50K 的速度运行。复制在每次更新之间进行检查,每次检查前都会有一个小的暂停(0.01 秒)。 我 运行 关注以下两个问题:

  1. 该查询最初的性能是可以接受的,并且可以进行大量迭代。随着脚本运行,处理每个更新的时间无限增长,直到系统似乎停止处理所有更新。我们从每次更新大约 2-3 秒开始,在最后一次尝试时增加到 4 分钟以上,大约在 4-6 小时的迭代之后。 MySQL 服务器仍在响应,但可用缓冲区为 1(不确定是否相关)并且没有进一步的更新正在处理,尽管它们记录在脚本日志文件中。

  2. 通过全局变量收集的受影响的行数远远大于系统中已更新的实际行数。我的过程是检查日志文件中的交互次数,乘以 50K,这应该是受影响的行,但是,计数要低得多。

脚本如下:

#!/usr/bin/python2.7

# Script Process:
# Py Script to populate all result.orderedPanelOid items from the ordered_panel_result_rel table.
# The UPDATEs are batched in 50K chunks, with replication checks between each batch.
# The largest tenant has about 1.3B result rows and can process a batch in 9-11 seconds.

import mysql.connector,time

#Core Logins
hcmaster_username = 'user_name'
hcmaster_password = 'pw'
hcmaster_hostname = 'host'
hcmaster_database = 'db'

#Tenant Logins
username = 'user_name'
password = 'pw'
hostname_master = 'master_host'
hostname_slave = 'slave_host'

#Define Class
class MySQLCursorDict(mysql.connector.cursor.MySQLCursor):
    def _row_to_python(self, rowdata, desc=None):
        row = super(MySQLCursorDict, self)._row_to_python(rowdata, desc)
        if row:
            return dict(zip(self.column_names, row))
        return None

#Define Record Count
def get_record_count(db_connection, query):
    count_cursor = db_connection.cursor()
    count_cursor.execute(query % "SELECT COUNT(*)")
    return count_cursor.fetchall()[0][0]

#Define Check Replication Delay
def check_replication_delay( mysql_connection ):
  id_cursor = mysql_connection.cursor( cursor_class = MySQLCursorDict )
  id_cursor.execute("show slave status")
  try:
      slave_status = id_cursor.fetchall()[0]
      print "%s delay is %d" % ( hostname_slave, int(slave_status['Seconds_Behind_Master']))
      return int(slave_status['Seconds_Behind_Master'])
  except:
      return 0

#Define UPDATE sequence
def update_sequence():

  # Check Replication
  time.sleep(.01)
  while (check_replication_delay(slave) > 30):
      print "Delay is over 30 seconds on RO Slave.  Sleeping for 15 seconds"
      time.sleep(15)

  # Run Query to UPDATE in batches
  #The inner join is req. so that only rows in result with a corresponding rel row are attempted to be updated
  #without the inner join the script loops endlessly because there is no rel value to update result with.
  tenant_cursor.execute(
      "UPDATE %s.result rm inner join (select rm1.id,rm1.OrderedPanelOid,rr1.ResultId,rr1.OrderedPanelId FROM %s.result rm1 inner join %s.ordered_panel_result_rel rr1 on rm1.id = rr1.ResultId WHERE rm1.OrderedPanelOid IS NULL ORDER BY rm1.id LIMIT 50000) as SOURCE ON rm.id = SOURCE.id SET rm.OrderedPanelOid = SOURCE.orderedPanelId WHERE rm.id = SOURCE.ResultId" % (database_encoded, database_encoded, database_encoded))

  # Get affected Rows
  rows_affected = tenant_cursor.rowcount
  global rowsaffected
  rowsaffected = rows_affected
  print rows_affected

  # Commit the UPDATEs
  tenant.commit()

#Collect All Databases
try:
    #Production Core Connection
    db = mysql.connector.connect(user=user_name, password=pw, host=host, database=db, ssl_ca='/file/location')

    #Establish hcmaster Cursor Connections
    hc_cursor = db.cursor()

    #Get Database List -- Change this out when we go full tenant list!
    query = "select databaseName from tenant_environment where databaseName IN ('db1','db2');"
    hc_cursor.execute(query)
    databases = hc_cursor.fetchall()
    tenant_list = list(zip(*databases)[0])
    tenant_listStr = str(tenant_list)

    print "Databases to execute upon: " + tenant_listStr

except Exception as err:
        e = str(err)
        print "Something has gone wrong in collecting databases! " + e

for database in databases:
    databaseStr = str(database)
    database_encoded = databaseStr.strip()[3:-3]
    print "Trying " + database_encoded

    # Production Connections
    tenant = mysql.connector.connect(user=user_name, password=pw, host=master_host, database=database_encoded,ssl_ca='/file/location')
    slave = mysql.connector.connect(user=user_name, password=pw, host=slave_host, database=database_encoded,ssl_ca='/file/location')

    # Establish Cursor Connections
    tenant_cursor = tenant.cursor()

    # Collect Start Time
    print database_encoded + " start time: " + time.strftime("%Y-%m-%d %H:%M:%S")

    try:
        update_sequence()
        while rowsaffected > 0:
            print "Number of Rows Affected: " + str(rowsaffected) + " at " + time.strftime("%H:%M:%S")
            update_sequence()
        else:
            print "Number of Rows Affected: 0"

    except Exception as err:
        e = str(err)
        print database_encoded + ".result.orderedPanelOid was not populated! Process failed! " + e

    #Collect End Time
    print database_encoded + " completion time: " + time.strftime("%Y-%m-%d %H:%M:%S") + "\n"

我将不胜感激任何帮助或想法,因为我在这两个问题上都有点难过。我是一名新的 DBA,但到目前为止已经编写了一些 python 脚本。 MySQL版本为5.6.34,使用python2.7


为便于阅读而格式化的查询:

 UPDATE %s.result rm
 inner join (select rm1.id,rm1.OrderedPanelOid,
                    rr1.ResultId,rr1.OrderedPanelId 
               FROM %s.result rm1 
              inner join %s.ordered_panel_result_rel rr1 on rm1.id = rr1.ResultId 
              WHERE rm1.OrderedPanelOid IS NULL 
              ORDER BY rm1.id 
              LIMIT 50000
            ) as SOURCE ON rm.id = SOURCE.id 
   SET rm.OrderedPanelOid = SOURCE.orderedPanelId 
 WHERE rm.id = SOURCE.ResultId

创建 Table 个语句:

CREATE TABLE `result` (
  `id` decimal(30,0) NOT NULL,
  `v` bigint(20) DEFAULT NULL,
  `createStamp` timestamp NULL DEFAULT NULL,
  `modifiedStamp` timestamp NULL DEFAULT NULL,
  `createdBy` decimal(30,0) DEFAULT NULL,
  `modifiedBy` decimal(30,0) DEFAULT NULL,
  `active` tinyint(1) NOT NULL,
  `testData` tinyint(1) NOT NULL DEFAULT '0',
  `testOid` decimal(30,0) DEFAULT NULL,
  `orderedPanelOid` decimal(30,0) DEFAULT NULL,
  `name` varchar(255) DEFAULT NULL,
  `code` varchar(255) DEFAULT NULL,
  `result` longtext,
  `numericResult` double DEFAULT NULL,
  `comment` longtext,
  `description` longtext,
  `status` varchar(100) DEFAULT NULL,
  `units` varchar(255) DEFAULT NULL,
  `refRange` varchar(255) DEFAULT NULL,
  `refRangeLowNumeric` double DEFAULT NULL,
  `refRangeHighNumeric` double DEFAULT NULL,
  `patientOid` decimal(30,0) DEFAULT NULL,
  `labOrderOid` decimal(30,0) DEFAULT NULL,
  `placeOrderNumber` varchar(255) DEFAULT NULL,
  `criticality` varchar(100) DEFAULT NULL,
  `conclusionTypeValueId` decimal(30,0) DEFAULT NULL,
  `runDate` datetime DEFAULT NULL,
  `resultDate` datetime DEFAULT NULL,
  `enteredByOid` decimal(30,0) DEFAULT NULL,
  `signedByOid` decimal(30,0) DEFAULT NULL,
  `row_modified_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `resultLocalDate` datetime DEFAULT NULL,
  `runLocalDate` datetime DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `idx_resultDate` (`resultDate`),
  KEY `idx_runDate` (`runDate`),
  KEY `idx_name` (`name`),
  KEY `idx_code` (`code`),
  KEY `idx_patientOid` (`patientOid`),
  KEY `idx_labOrderOid` (`labOrderOid`),
  KEY `idx_placeOrderNumber` (`placeOrderNumber`),
  KEY `idx_orderedPanelOid` (`orderedPanelOid`),
  KEY `idx_enteredByOid` (`enteredByOid`),
  KEY `idx_testOid` (`testOid`),
  KEY `idx_active` (`active`),
  KEY `idx_signedByOid` (`signedByOid`),
  KEY `idx_row_modified_at` (`row_modified_at`),
  KEY `idx_resultLocalDate` (`resultLocalDate`),
  KEY `idx_runLocalDate` (`runLocalDate`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

    CREATE TABLE `ordered_panel_result_rel` (
  `id` decimal(30,0) NOT NULL,
  `orderedPanelId` decimal(30,0) NOT NULL,
  `resultId` decimal(30,0) NOT NULL,
  `row_modified_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `id_UNIQUE` (`id`),
  UNIQUE KEY `idx_uqopResultIds` (`orderedPanelId`,`resultId`),
  KEY `SET` (`orderedPanelId`,`resultId`),
  KEY `OPANELRESRELID` (`orderedPanelId`),
  KEY `idx_orderedPanelId` (`orderedPanelId`),
  KEY `idx_resultId` (`resultId`),
  KEY `idx_row_modified_at` (`row_modified_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

我猜您正在寻找需要修复的 50K 行。但是当你修复了一个批次之后,下一个批次就进入了table?那只会越来越慢。

解决方法是根据 PRIMARY KEY 遍历 table。获取 50K 行的块,修复许多需要修复的 50K(可能小于 50K)。这将从 "slower and slower" 变为每个块的恒定、快速的速度。

我在这里讨论这种分块的细节:http://mysql.rjweb.org/doc.php/deletebig#deleting_in_chunks

其他问题...

  • 单个块中 50K 可能太多了,尤其是在使用复制的情况下。
  • 为什么是DECIMAL(30,0)?它很大(14 字节)并且可能比需要的大很多。参见 INT 及其朋友。
  • 你有多个'identical'DATABASEs吗?除非您托管某种服务,否则这通常是糟糕的架构设计。
  • 标志上的单列索引(例如 active)很少有用。
  • A PRIMARY KEY 根据定义(在 MySQL 中),UNIQUE;不要重新索引。
  • 如果ordered_panel_result_rel是many:many关系table,请参阅http://mysql.rjweb.org/doc.php/index_cookbook_mysql#many_to_many_mapping_table
  • 中的提示
  • 具有架构的 table 都有一个 PRIMARY KEY(id) 而不是 AUTO_INCREMENT;您是手动设置值吗?

这是我更新的脚本。再次感谢@Rick James 在正确方向上的推动。

#!/usr/bin/python2.7

# Script Process:
# Py Script to populate all result.orderedPanelOid items from the ordered_panel_result_rel table.
# The UPDATEs are batched in 10K chunks, with replication checks between each batch.


import mysql.connector,time

#Core Logins
hcmaster_username = 'username'
hcmaster_password = 'pw'
hcmaster_hostname = 'host'
hcmaster_database = 'db'

#Tenant Logins
username = 'username'
password = 'pw'
hostname_master = 'masterhost'
hostname_slave = 'slavehost'

#Define Class
class MySQLCursorDict(mysql.connector.cursor.MySQLCursor):
    def _row_to_python(self, rowdata, desc=None):
        row = super(MySQLCursorDict, self)._row_to_python(rowdata, desc)
        if row:
            return dict(zip(self.column_names, row))
        return None

#Define Record Count
def get_record_count(db_connection, query):
    count_cursor = db_connection.cursor()
    count_cursor.execute(query % "SELECT COUNT(*)")
    return count_cursor.fetchall()[0][0]

#Define Check Replication Delay
def check_replication_delay( mysql_connection ):
  id_cursor = mysql_connection.cursor( cursor_class = MySQLCursorDict )
  id_cursor.execute("show slave status")
  try:
      slave_status = id_cursor.fetchall()[0]
      print "%s delay is %d" % ( hostname_slave, int(slave_status['Seconds_Behind_Master']))
      return int(slave_status['Seconds_Behind_Master'])
  except:
      return 0

#Define UPDATE sequence
def update_sequence():

  # Check Replication
  time.sleep(.01)
  while (check_replication_delay(slave) > 30):
      print "Delay is over 30 seconds on RO Slave.  Sleeping for 15 seconds"
      time.sleep(15)

  # Begin the Batch Updates with Bracketed Ranges

  #find minimum id from set
  tenant_cursor.execute("SELECT rm.id from %s.result rm inner join %s.ordered_panel_result_rel rr on rm.id = rr.ResultId WHERE rm.id = rr.ResultId AND rm.orderedPanelOid is NULL ORDER BY rm.id ASC LIMIT 1" % (database_encoded, database_encoded))
  min = tenant_cursor.fetchone()
  print min

  #Deal with 0 rows
  if min is None:
    rows_affected = -1
    global rowsaffected
    rowsaffected = rows_affected
    pass

  else:
    # find maximum id from range
    smin = str(min[0])
    tenant_cursor.execute("SELECT rm.id from %s.result rm inner join %s.ordered_panel_result_rel rr on rm.id = rr.ResultId WHERE rm.id = rr.ResultId AND rm.orderedPanelOid is NULL AND rm.id >= %s ORDER BY rm.id ASC LIMIT 10000,1" % (database_encoded, database_encoded, smin))
    max = tenant_cursor.fetchone()
    print max

    # Execute on Last Batch of ID's
    if max is None or min[0] >= max[0]:
      smin = str(min[0])
      tenant_cursor.execute("UPDATE %s.result rm inner join %s.ordered_panel_result_rel rr on rm.id = rr.ResultId SET rm.OrderedPanelOid = rr.orderedPanelId WHERE rm.id = rr.ResultId AND rm.OrderedPanelOid IS NULL and rm.id >= %s" % (database_encoded, database_encoded, smin))

      # Get affected Rows
      rows_affected = tenant_cursor.rowcount
      global rowsaffected
      rowsaffected = rows_affected
      print rows_affected

    #Or Execute on All Other Batches of ID's
    else:
      smin = str(min[0])
      smax = str(max[0])
      tenant_cursor.execute("UPDATE %s.result rm inner join %s.ordered_panel_result_rel rr on rm.id = rr.ResultId SET rm.OrderedPanelOid = rr.orderedPanelId WHERE rm.id = rr.ResultId AND rm.OrderedPanelOid IS NULL and rm.id >= %s AND rm.id < %s" % (database_encoded, database_encoded, smin, smax))

      # Get affected Rows
      rows_affected = tenant_cursor.rowcount
      global rowsaffected
      rowsaffected = rows_affected
      print rows_affected

    # Commit the UPDATEs
    tenant.commit()

#Collect All Databases
try:
    #Production Core Connection
    master = mysql.connector.connect(user=master_username, password=master_password, host=master_hostname, database=master_database, ssl_ca='/file/location')

    #Establish master Cursor Connections
    cursor = master.cursor()

    #Get Database List -- Change this out when we go full tenant list!
    query = "select databaseName from master.tenant_environment where databaseName IN ('db1','db2');"
    hc_cursor.execute(query)
    databases = hc_cursor.fetchall()
    tenant_list = list(zip(*databases)[0])
    tenant_listStr = str(tenant_list)

    print "Databases to execute upon: " + tenant_listStr

except Exception as err:
        e = str(err)
        print "Something has gone wrong in collecting databases! " + e

for database in databases:
    databaseStr = str(database)
    database_encoded = databaseStr.strip()[3:-3]
    print "Trying " + database_encoded

    # Production Connections
    tenant = mysql.connector.connect(user=username, password=password, host=hostname_master, database=database_encoded,ssl_ca='/file/location')
    slave = mysql.connector.connect(user=username, password=password, host=hostname_slave, database=database_encoded,ssl_ca='/file/location')

    # Establish Cursor Connections
    tenant_cursor = tenant.cursor()

    # Collect Start Time
    print database_encoded + " start time: " + time.strftime("%Y-%m-%d %H:%M:%S")

    try:
        update_sequence()
        while rowsaffected > 0:
            print "Number of Rows Affected: " + str(rowsaffected) + " at " + time.strftime("%H:%M:%S")
            update_sequence()
        else:
            print "Number of Rows Affected: 0"

    except Exception as err:
        e = str(err)
        print database_encoded + ".result.orderedPanelOid was not populated! Process failed! " + e

    #Collect End Time
    print database_encoded + " completion time: " + time.strftime("%Y-%m-%d %H:%M:%S") + "\n"