AWS DMS 使用 DatabaseMigrationService.Client.describe_table_statistics 大结果集丢失记录

AWS DMS Losing records using DatabaseMigrationService.Client.describe_table_statistics with large result set

我正在使用 describe_table_statistics 检索给定 DMS 任务中的 table 列表,并使用响应 ['Marker'] 有条件地循环 describe_table_statistics .

当我不使用过滤器时,我得到了正确的记录数,13k+。 当我使用其结果集少于 MaxRecords 的过滤器或过滤器组合时,我得到了正确的记录数。

但是,当我传入一个会得到比 MaxRecords 更大的记录集的过滤器时,我得到的记录比我应该得到的要少得多。

这是我检索 table 集合的函数:

def get_dms_task_tables(account, region, task_name, schema_name=None, table_state=None):
   tables=[]
   max_records=500

   filters=[]
   if schema_name:
      filters.append({'Name':'schema-name', 'Values':[schema_name]})
   if table_state:
      filters.append({'Name':'table-state', 'Values':[table_state]})

   task_arn = get_dms_task_arn(account, region, task_name)

   session = boto3.Session(profile_name=account, region_name=region)
   client = session.client('dms')

   response = client.describe_table_statistics(
      ReplicationTaskArn=task_arn
      ,Filters=filters
      ,MaxRecords=max_records)

   tables += response['TableStatistics']

   while len(response['TableStatistics']) == max_records:
      response = client.describe_table_statistics(
         ReplicationTaskArn=task_arn
         ,Filters=filters
         ,MaxRecords=max_records
         ,Marker=response['Marker'])

      tables += response['TableStatistics']

   return tables

为了排除故障,我在 table 秒内循环打印每 table 一行:

        print(', '.join((
            t['SchemaName']
            ,t['TableName']
            ,t['TableState'])))

当我没有为 'Table completed' 的 table 状态传递任何过滤器和 grep 时,我通过控制台

获得了 12k+ 条记录,这是正确的计数

至少从表面上看,响应循环是有效的。

当我传入模式名称和 table 状态过滤条件时,我得到了正确的计数,正如控制台所确认的那样,但是这个计数小于 MaxRecords。

当我刚刚为 'Table completed' 输入 table 状态过滤器时,我只得到 949 条记录,所以我缺少 11k 条记录。

我试过在循环内从 describe_table_statistics 中省略 Filter 参数,但在所有情况下我得到的结果都是一样的。

我怀疑我在循环中对 describe_table_statistics 的调用有问题,但我一直无法在亚马逊的文档中找到这方面的示例来证实这一点。

应用过滤器时,describe_table_statistics 不遵守 MaxRecords 限制。

事实上,它所做的似乎是检索 (2 x MaxRecords),应用过滤器,然后 return 设置。或者它可能检索 MaxRecords,应用过滤器,然后继续,直到结果集大于 MaxRecords。无论哪种方式,我的 while 条件都是问题所在。

我换了

while len(response['TableStatistics']) == max_records:

while 'Marker' in response:

现在函数 return 是正确的记录数。

顺便说一下,我的第一次尝试是

while len(response['TableStatistics']) >= 1:

但是在循环的最后一次迭代中抛出了这个错误:

KeyError: 'Marker'

完成的工作函数现在看起来是这样的:

def get_dms_task_tables(account, region, task_name, schema_name=None, table_state=None):
   tables=[]
   max_records=500

   filters=[]
   if schema_name:
      filters.append({'Name':'schema-name', 'Values':[schema_name]})
   if table_state:
      filters.append({'Name':'table-state', 'Values':[table_state]})

   task_arn = get_dms_task_arn(account, region, task_name)

   session = boto3.Session(profile_name=account, region_name=region)
   client = session.client('dms')

   response = client.describe_table_statistics(
      ReplicationTaskArn=task_arn
      ,Filters=filters
      ,MaxRecords=max_records)

   tables += response['TableStatistics']

   while 'Marker' in response:
      response = client.describe_table_statistics(
         ReplicationTaskArn=task_arn
         ,Filters=filters
         ,MaxRecords=max_records
         ,Marker=response['Marker'])

      tables += response['TableStatistics']

   return tables