根据主题名称验证主题是否存在
Verify if a topic exists based on topic name
我正在尝试根据主题名称验证主题是否存在。
你知道这是否可能吗?
例如,我想验证名称为 "test" 的主题是否已经存在。
以下是我正在尝试但不起作用的方法,因为 topicsList 包含 topicArns 而不是 topicNames...
topics = sns.get_all_topics()
topicsList = topics['ListTopicsResponse']['ListTopicsResult'['Topics']
if "test" in topicsList:
print("true")
这是一种 hack,但它应该有效:
topics = sns.get_all_topics()
topic_list = topics['ListTopicsResponse']['ListTopicsResult']['Topics']
topic_names = [t['TopicArn'].split(':')[5] for t in topic_list]
if 'test' in topic_names:
print(True)
如果您的主题超过 100 个,此代码将有效
def get_topic(token=None):
topics = self.sns.get_all_topics(token)
next_token = topics['ListTopicsResponse']['ListTopicsResult']['NextToken']
topic_list = topics['ListTopicsResponse']['ListTopicsResult']['Topics']
for topic in topic_list:
if "your_topic_name" in topic['TopicArn'].split(':')[5]:
return topic['TopicArn']
else:
if next_token:
get_topic(next_token)
else:
return None
如果您尝试捕获 An error occurred (NotFound) when calling the GetTopicAttributes operation: Topic does not exist
异常怎么办?
from botocore.exceptions import ClientError
topic_arn = "arn:aws:sns:us-east-1:999999999:neverFound"
try:
response = client.get_topic_attributes(
TopicArn=topic_arn
)
print "Exists"
except ClientError as e:
# Validate if is this:
# An error occurred (NotFound) when calling the GetTopicAttributes operation: Topic does not exist
print "Does not exists"
我正在尝试根据主题名称验证主题是否存在。
你知道这是否可能吗?
例如,我想验证名称为 "test" 的主题是否已经存在。
以下是我正在尝试但不起作用的方法,因为 topicsList 包含 topicArns 而不是 topicNames...
topics = sns.get_all_topics()
topicsList = topics['ListTopicsResponse']['ListTopicsResult'['Topics']
if "test" in topicsList:
print("true")
这是一种 hack,但它应该有效:
topics = sns.get_all_topics()
topic_list = topics['ListTopicsResponse']['ListTopicsResult']['Topics']
topic_names = [t['TopicArn'].split(':')[5] for t in topic_list]
if 'test' in topic_names:
print(True)
如果您的主题超过 100 个,此代码将有效
def get_topic(token=None):
topics = self.sns.get_all_topics(token)
next_token = topics['ListTopicsResponse']['ListTopicsResult']['NextToken']
topic_list = topics['ListTopicsResponse']['ListTopicsResult']['Topics']
for topic in topic_list:
if "your_topic_name" in topic['TopicArn'].split(':')[5]:
return topic['TopicArn']
else:
if next_token:
get_topic(next_token)
else:
return None
如果您尝试捕获 An error occurred (NotFound) when calling the GetTopicAttributes operation: Topic does not exist
异常怎么办?
from botocore.exceptions import ClientError
topic_arn = "arn:aws:sns:us-east-1:999999999:neverFound"
try:
response = client.get_topic_attributes(
TopicArn=topic_arn
)
print "Exists"
except ClientError as e:
# Validate if is this:
# An error occurred (NotFound) when calling the GetTopicAttributes operation: Topic does not exist
print "Does not exists"