为什么我有时会在使用 SQS 客户端时出现 Key Error

Why do I sometimes get Key Error using SQS client

我正在使用 boto3 SQS 客户端从 AWS SQS FIFO 队列接收消息。

def consume_msgs():
    sqs = None
    try:
        sqs = boto3.client('sqs',
                       region_name=S3_BUCKET_REGION,
                       aws_access_key_id=AWS_ACCESS_KEY_ID,
                       aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
    except Exception:
        logger.warning('SQS client error {}'.format(sys.exc_info()[0]))
        logger.error(traceback.format_exc())

  ### more code to process message

该应用程序使用 upstart 在 EC2 上设置为服务。大多数时候它工作正常。但有时当我在更改代码后重新启动服务时,应用程序会退出并出现以下错误

2018-10-06 01:29:38,654 WARNING SQS client error <class 'KeyError'>
2018-10-06 01:29:38,658 WARNING SQS client error <class 'KeyError'>
2018-10-06 01:29:38,663 ERROR Traceback (most recent call last):
  File "/home/ec2-user/aae_client/app/run.py", line 194, in consume_msgs
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
  File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/boto3/__init__.py", line 83, in client
    return _get_default_session().client(*args, **kwargs)
  File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/boto3/session.py", line 263, in client
    aws_session_token=aws_session_token, config=config)
  File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/botocore/session.py", line 851, in create_client
    endpoint_resolver = self.get_component('endpoint_resolver')
  File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/botocore/session.py", line 726, in get_component
    return self._components.get_component(name)
  File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/botocore/session.py", line 926, in get_component
    del self._deferred[name]
KeyError: 'endpoint_resolver'

重启服务通常可以解决问题。每次我重新启动服务时都不会发生。令人困惑的是导致实际错误回溯的 KeyError 警告。这个 KeyError 到底指的是什么?它不能是 AWS_SECRET_ACCESS_KEY 因为这个键永远不会改变,而且它在大多数时候都工作得很好。这个问题是随机发生的,来来去去。因此很难调试。而且我不明白这个错误是如何逃脱 try..except

编辑

根据评论,这似乎与多线程有关。 consume_msg 确实是 运行 被多个线程 def process_msgs():

for i in range(NUM_WORKERS):
    t = threading.Thread(target=consume_msgs, name='worker-%s' % i)
    t.setDaemon(True)
    t.start()
while True:
    time.sleep(MAIN_PROCESS_SLEEP_INTERVAL)

这个github issue建议你应该在顶层设置一次sqs客户端(而不是在函数中):

sqs = boto3.client('sqs',
                   region_name=S3_BUCKET_REGION,
                   aws_access_key_id=AWS_ACCESS_KEY_ID,
                   aws_secret_access_key=AWS_SECRET_ACCESS_KEY)


def consume_msgs():
    # code to process message

我在为 S3 创建客户端时遇到此错误,但据我所知这是同一个问题。 在创建客户端的过程中使用了 non-thread-safe 代码:

        if name in self._deferred:
            factory = self._deferred[name]
            self._components[name] = factory()
            # Only delete the component from the deferred dict after
            # successfully creating the object from the factory as well as
            # injecting the instantiated value into the _components dict.
            del self._deferred[name]

(来自 get_component 方法中的 botocore/session.py - 这是在尝试删除由不同线程删除的密钥时引发 KeyError 的代码)

锁定客户端创建为我解决了这个问题(如 https://github.com/boto/boto3/pull/806 中所建议)

也许我误解了其他一些答案,但在多线程执行的情况下,如果这些函数在单独的线程中执行,我认为拥有一个 boto3 客户端对象并将其传递给其他函数是行不通的。我在调用 boto3 客户端服务时遇到了零星的 endpoint_resolver 错误,并且通过遵循 documentation and the comments on boto3 GitHub issues such as #1246 and #1592 中的示例并在每个线程中创建一个单独的会话对象来停止它们。就我而言,这意味着我的代码几乎没有什么变化,从

client = boto3.client(variant, region_name = creds['region_name'],
                      aws_access_key_id = ...,
                      aws_secret_access_key = ...)

session = boto3.session.Session()
client = session.client(variant, region_name = creds['region_name'],
                        aws_access_key_id = ...,
                        aws_secret_access_key = ...)

在单独线程中执行的函数中。我对 consume_msgs() 的 OP 代码的阅读是可以进行类似的更改,它将消除偶尔出现的 endpoint_resolver 错误。