为什么我有时会在使用 SQS 客户端时出现 Key Error
Why do I sometimes get Key Error using SQS client
我正在使用 boto3 SQS 客户端从 AWS SQS FIFO 队列接收消息。
def consume_msgs():
sqs = None
try:
sqs = boto3.client('sqs',
region_name=S3_BUCKET_REGION,
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
except Exception:
logger.warning('SQS client error {}'.format(sys.exc_info()[0]))
logger.error(traceback.format_exc())
### more code to process message
该应用程序使用 upstart
在 EC2 上设置为服务。大多数时候它工作正常。但有时当我在更改代码后重新启动服务时,应用程序会退出并出现以下错误
2018-10-06 01:29:38,654 WARNING SQS client error <class 'KeyError'>
2018-10-06 01:29:38,658 WARNING SQS client error <class 'KeyError'>
2018-10-06 01:29:38,663 ERROR Traceback (most recent call last):
File "/home/ec2-user/aae_client/app/run.py", line 194, in consume_msgs
aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/boto3/__init__.py", line 83, in client
return _get_default_session().client(*args, **kwargs)
File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/boto3/session.py", line 263, in client
aws_session_token=aws_session_token, config=config)
File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/botocore/session.py", line 851, in create_client
endpoint_resolver = self.get_component('endpoint_resolver')
File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/botocore/session.py", line 726, in get_component
return self._components.get_component(name)
File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/botocore/session.py", line 926, in get_component
del self._deferred[name]
KeyError: 'endpoint_resolver'
重启服务通常可以解决问题。每次我重新启动服务时都不会发生。令人困惑的是导致实际错误回溯的 KeyError
警告。这个 KeyError
到底指的是什么?它不能是 AWS_SECRET_ACCESS_KEY
因为这个键永远不会改变,而且它在大多数时候都工作得很好。这个问题是随机发生的,来来去去。因此很难调试。而且我不明白这个错误是如何逃脱 try..except
块
编辑
根据评论,这似乎与多线程有关。 consume_msg
确实是 运行 被多个线程
def process_msgs():
for i in range(NUM_WORKERS):
t = threading.Thread(target=consume_msgs, name='worker-%s' % i)
t.setDaemon(True)
t.start()
while True:
time.sleep(MAIN_PROCESS_SLEEP_INTERVAL)
这个github issue建议你应该在顶层设置一次sqs客户端(而不是在函数中):
sqs = boto3.client('sqs',
region_name=S3_BUCKET_REGION,
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
def consume_msgs():
# code to process message
我在为 S3 创建客户端时遇到此错误,但据我所知这是同一个问题。
在创建客户端的过程中使用了 non-thread-safe 代码:
if name in self._deferred:
factory = self._deferred[name]
self._components[name] = factory()
# Only delete the component from the deferred dict after
# successfully creating the object from the factory as well as
# injecting the instantiated value into the _components dict.
del self._deferred[name]
(来自 get_component 方法中的 botocore/session.py - 这是在尝试删除由不同线程删除的密钥时引发 KeyError 的代码)
锁定客户端创建为我解决了这个问题(如 https://github.com/boto/boto3/pull/806 中所建议)
也许我误解了其他一些答案,但在多线程执行的情况下,如果这些函数在单独的线程中执行,我认为拥有一个 boto3 客户端对象并将其传递给其他函数是行不通的。我在调用 boto3 客户端服务时遇到了零星的 endpoint_resolver
错误,并且通过遵循 documentation and the comments on boto3 GitHub issues such as #1246 and #1592 中的示例并在每个线程中创建一个单独的会话对象来停止它们。就我而言,这意味着我的代码几乎没有什么变化,从
client = boto3.client(variant, region_name = creds['region_name'],
aws_access_key_id = ...,
aws_secret_access_key = ...)
至
session = boto3.session.Session()
client = session.client(variant, region_name = creds['region_name'],
aws_access_key_id = ...,
aws_secret_access_key = ...)
在单独线程中执行的函数中。我对 consume_msgs()
的 OP 代码的阅读是可以进行类似的更改,它将消除偶尔出现的 endpoint_resolver
错误。
我正在使用 boto3 SQS 客户端从 AWS SQS FIFO 队列接收消息。
def consume_msgs():
sqs = None
try:
sqs = boto3.client('sqs',
region_name=S3_BUCKET_REGION,
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
except Exception:
logger.warning('SQS client error {}'.format(sys.exc_info()[0]))
logger.error(traceback.format_exc())
### more code to process message
该应用程序使用 upstart
在 EC2 上设置为服务。大多数时候它工作正常。但有时当我在更改代码后重新启动服务时,应用程序会退出并出现以下错误
2018-10-06 01:29:38,654 WARNING SQS client error <class 'KeyError'>
2018-10-06 01:29:38,658 WARNING SQS client error <class 'KeyError'>
2018-10-06 01:29:38,663 ERROR Traceback (most recent call last):
File "/home/ec2-user/aae_client/app/run.py", line 194, in consume_msgs
aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/boto3/__init__.py", line 83, in client
return _get_default_session().client(*args, **kwargs)
File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/boto3/session.py", line 263, in client
aws_session_token=aws_session_token, config=config)
File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/botocore/session.py", line 851, in create_client
endpoint_resolver = self.get_component('endpoint_resolver')
File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/botocore/session.py", line 726, in get_component
return self._components.get_component(name)
File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/botocore/session.py", line 926, in get_component
del self._deferred[name]
KeyError: 'endpoint_resolver'
重启服务通常可以解决问题。每次我重新启动服务时都不会发生。令人困惑的是导致实际错误回溯的 KeyError
警告。这个 KeyError
到底指的是什么?它不能是 AWS_SECRET_ACCESS_KEY
因为这个键永远不会改变,而且它在大多数时候都工作得很好。这个问题是随机发生的,来来去去。因此很难调试。而且我不明白这个错误是如何逃脱 try..except
块
编辑
根据评论,这似乎与多线程有关。 consume_msg
确实是 运行 被多个线程
def process_msgs():
for i in range(NUM_WORKERS):
t = threading.Thread(target=consume_msgs, name='worker-%s' % i)
t.setDaemon(True)
t.start()
while True:
time.sleep(MAIN_PROCESS_SLEEP_INTERVAL)
这个github issue建议你应该在顶层设置一次sqs客户端(而不是在函数中):
sqs = boto3.client('sqs',
region_name=S3_BUCKET_REGION,
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
def consume_msgs():
# code to process message
我在为 S3 创建客户端时遇到此错误,但据我所知这是同一个问题。 在创建客户端的过程中使用了 non-thread-safe 代码:
if name in self._deferred:
factory = self._deferred[name]
self._components[name] = factory()
# Only delete the component from the deferred dict after
# successfully creating the object from the factory as well as
# injecting the instantiated value into the _components dict.
del self._deferred[name]
(来自 get_component 方法中的 botocore/session.py - 这是在尝试删除由不同线程删除的密钥时引发 KeyError 的代码)
锁定客户端创建为我解决了这个问题(如 https://github.com/boto/boto3/pull/806 中所建议)
也许我误解了其他一些答案,但在多线程执行的情况下,如果这些函数在单独的线程中执行,我认为拥有一个 boto3 客户端对象并将其传递给其他函数是行不通的。我在调用 boto3 客户端服务时遇到了零星的 endpoint_resolver
错误,并且通过遵循 documentation and the comments on boto3 GitHub issues such as #1246 and #1592 中的示例并在每个线程中创建一个单独的会话对象来停止它们。就我而言,这意味着我的代码几乎没有什么变化,从
client = boto3.client(variant, region_name = creds['region_name'],
aws_access_key_id = ...,
aws_secret_access_key = ...)
至
session = boto3.session.Session()
client = session.client(variant, region_name = creds['region_name'],
aws_access_key_id = ...,
aws_secret_access_key = ...)
在单独线程中执行的函数中。我对 consume_msgs()
的 OP 代码的阅读是可以进行类似的更改,它将消除偶尔出现的 endpoint_resolver
错误。