AWS DMS 使用 DatabaseMigrationService.Client.describe_table_statistics 大结果集丢失记录
AWS DMS Losing records using DatabaseMigrationService.Client.describe_table_statistics with large result set
我正在使用 describe_table_statistics 检索给定 DMS 任务中的 table 列表,并使用响应 ['Marker'] 有条件地循环 describe_table_statistics .
当我不使用过滤器时,我得到了正确的记录数,13k+。
当我使用其结果集少于 MaxRecords 的过滤器或过滤器组合时,我得到了正确的记录数。
但是,当我传入一个会得到比 MaxRecords 更大的记录集的过滤器时,我得到的记录比我应该得到的要少得多。
这是我检索 table 集合的函数:
def get_dms_task_tables(account, region, task_name, schema_name=None, table_state=None):
tables=[]
max_records=500
filters=[]
if schema_name:
filters.append({'Name':'schema-name', 'Values':[schema_name]})
if table_state:
filters.append({'Name':'table-state', 'Values':[table_state]})
task_arn = get_dms_task_arn(account, region, task_name)
session = boto3.Session(profile_name=account, region_name=region)
client = session.client('dms')
response = client.describe_table_statistics(
ReplicationTaskArn=task_arn
,Filters=filters
,MaxRecords=max_records)
tables += response['TableStatistics']
while len(response['TableStatistics']) == max_records:
response = client.describe_table_statistics(
ReplicationTaskArn=task_arn
,Filters=filters
,MaxRecords=max_records
,Marker=response['Marker'])
tables += response['TableStatistics']
return tables
为了排除故障,我在 table 秒内循环打印每 table 一行:
print(', '.join((
t['SchemaName']
,t['TableName']
,t['TableState'])))
当我没有为 'Table completed' 的 table 状态传递任何过滤器和 grep 时,我通过控制台
获得了 12k+ 条记录,这是正确的计数
至少从表面上看,响应循环是有效的。
当我传入模式名称和 table 状态过滤条件时,我得到了正确的计数,正如控制台所确认的那样,但是这个计数小于 MaxRecords。
当我刚刚为 'Table completed' 输入 table 状态过滤器时,我只得到 949 条记录,所以我缺少 11k 条记录。
我试过在循环内从 describe_table_statistics 中省略 Filter 参数,但在所有情况下我得到的结果都是一样的。
我怀疑我在循环中对 describe_table_statistics 的调用有问题,但我一直无法在亚马逊的文档中找到这方面的示例来证实这一点。
应用过滤器时,describe_table_statistics 不遵守 MaxRecords 限制。
事实上,它所做的似乎是检索 (2 x MaxRecords),应用过滤器,然后 return 设置。或者它可能检索 MaxRecords,应用过滤器,然后继续,直到结果集大于 MaxRecords。无论哪种方式,我的 while 条件都是问题所在。
我换了
while len(response['TableStatistics']) == max_records:
和
while 'Marker' in response:
现在函数 return 是正确的记录数。
顺便说一下,我的第一次尝试是
while len(response['TableStatistics']) >= 1:
但是在循环的最后一次迭代中抛出了这个错误:
KeyError: 'Marker'
完成的工作函数现在看起来是这样的:
def get_dms_task_tables(account, region, task_name, schema_name=None, table_state=None):
tables=[]
max_records=500
filters=[]
if schema_name:
filters.append({'Name':'schema-name', 'Values':[schema_name]})
if table_state:
filters.append({'Name':'table-state', 'Values':[table_state]})
task_arn = get_dms_task_arn(account, region, task_name)
session = boto3.Session(profile_name=account, region_name=region)
client = session.client('dms')
response = client.describe_table_statistics(
ReplicationTaskArn=task_arn
,Filters=filters
,MaxRecords=max_records)
tables += response['TableStatistics']
while 'Marker' in response:
response = client.describe_table_statistics(
ReplicationTaskArn=task_arn
,Filters=filters
,MaxRecords=max_records
,Marker=response['Marker'])
tables += response['TableStatistics']
return tables
我正在使用 describe_table_statistics 检索给定 DMS 任务中的 table 列表,并使用响应 ['Marker'] 有条件地循环 describe_table_statistics .
当我不使用过滤器时,我得到了正确的记录数,13k+。 当我使用其结果集少于 MaxRecords 的过滤器或过滤器组合时,我得到了正确的记录数。
但是,当我传入一个会得到比 MaxRecords 更大的记录集的过滤器时,我得到的记录比我应该得到的要少得多。
这是我检索 table 集合的函数:
def get_dms_task_tables(account, region, task_name, schema_name=None, table_state=None):
tables=[]
max_records=500
filters=[]
if schema_name:
filters.append({'Name':'schema-name', 'Values':[schema_name]})
if table_state:
filters.append({'Name':'table-state', 'Values':[table_state]})
task_arn = get_dms_task_arn(account, region, task_name)
session = boto3.Session(profile_name=account, region_name=region)
client = session.client('dms')
response = client.describe_table_statistics(
ReplicationTaskArn=task_arn
,Filters=filters
,MaxRecords=max_records)
tables += response['TableStatistics']
while len(response['TableStatistics']) == max_records:
response = client.describe_table_statistics(
ReplicationTaskArn=task_arn
,Filters=filters
,MaxRecords=max_records
,Marker=response['Marker'])
tables += response['TableStatistics']
return tables
为了排除故障,我在 table 秒内循环打印每 table 一行:
print(', '.join((
t['SchemaName']
,t['TableName']
,t['TableState'])))
当我没有为 'Table completed' 的 table 状态传递任何过滤器和 grep 时,我通过控制台
获得了 12k+ 条记录,这是正确的计数至少从表面上看,响应循环是有效的。
当我传入模式名称和 table 状态过滤条件时,我得到了正确的计数,正如控制台所确认的那样,但是这个计数小于 MaxRecords。
当我刚刚为 'Table completed' 输入 table 状态过滤器时,我只得到 949 条记录,所以我缺少 11k 条记录。
我试过在循环内从 describe_table_statistics 中省略 Filter 参数,但在所有情况下我得到的结果都是一样的。
我怀疑我在循环中对 describe_table_statistics 的调用有问题,但我一直无法在亚马逊的文档中找到这方面的示例来证实这一点。
应用过滤器时,describe_table_statistics 不遵守 MaxRecords 限制。
事实上,它所做的似乎是检索 (2 x MaxRecords),应用过滤器,然后 return 设置。或者它可能检索 MaxRecords,应用过滤器,然后继续,直到结果集大于 MaxRecords。无论哪种方式,我的 while 条件都是问题所在。
我换了
while len(response['TableStatistics']) == max_records:
和
while 'Marker' in response:
现在函数 return 是正确的记录数。
顺便说一下,我的第一次尝试是
while len(response['TableStatistics']) >= 1:
但是在循环的最后一次迭代中抛出了这个错误:
KeyError: 'Marker'
完成的工作函数现在看起来是这样的:
def get_dms_task_tables(account, region, task_name, schema_name=None, table_state=None):
tables=[]
max_records=500
filters=[]
if schema_name:
filters.append({'Name':'schema-name', 'Values':[schema_name]})
if table_state:
filters.append({'Name':'table-state', 'Values':[table_state]})
task_arn = get_dms_task_arn(account, region, task_name)
session = boto3.Session(profile_name=account, region_name=region)
client = session.client('dms')
response = client.describe_table_statistics(
ReplicationTaskArn=task_arn
,Filters=filters
,MaxRecords=max_records)
tables += response['TableStatistics']
while 'Marker' in response:
response = client.describe_table_statistics(
ReplicationTaskArn=task_arn
,Filters=filters
,MaxRecords=max_records
,Marker=response['Marker'])
tables += response['TableStatistics']
return tables