上传到 s3 (boto3) 时带有 Eventlets 的 Celery 失败

Celery with Eventlets fails when uploading to s3 (boto3)

我一直在尝试使用带有 Eventlets 的 Celery 和 boto3 来上传,但它在上传过程中挂起。当使用没有 eventlet 的 celery 时,它们可以正常上传。打不打补丁似乎不影响最终结果

代码如下:

def getS3():
    s3_access_key, s3_secret_key = os.getenv('S3_ACCESS_KEY'), os.getenv('S3_SECRET_KEY')

    if not all([s3_access_key, s3_secret_key]):
        raise ValueError('Keys')

    session = Session(aws_access_key_id=s3_access_key,
                      aws_secret_access_key=s3_secret_key,
                      region_name='us-east-1')
    return session.resource('s3')


def upload_to_awss3(zip_io, zip_name, bucket, endpoint='s3-sa-east-1.amazonaws.com'):
    s3 = getS3()

    try:
        s3_object = s3.Bucket(bucket).put_object(Key=zip_name, Body=zip_io, ACL='public-read')

        return 'http://{}.{}/{}'.format(bucket, endpoint, zip_name)
    except Exception as e:
        print('upload error: ', e)
        raise

这是芹菜的结果:

[2016-05-24 11:43:06,878: DEBUG/MainProcess] Sending http request: <PreparedRequest [PUT]>
[2016-05-24 11:43:06,879: INFO/MainProcess] Resetting dropped connection: pontotel-docs.s3.amazonaws.com
[2016-05-24 11:43:07,069: DEBUG/MainProcess] Waiting for 100 Continue response.
[2016-05-24 11:43:47,176: DEBUG/MainProcess] 100 Continue response seen, now sending request body.
[2016-05-24 11:43:47,294: DEBUG/MainProcess] ConnectionError received when sending HTTP request.

这是错误:

[2016-05-24 12:30:04,800: DEBUG/MainProcess] Response headers: {'x-amz-id-2': 'VwcX2j4FmBoE2oUyH+08V0bh+ZW74vGOF0IkSP2h5KUp07ANcw8qOwexZyv5yupmaXOxiyYbiCg=', 'x-amz-request-id': '2F527FED26157010', 'content-type': 'application/xml', 'date': 'Tue, 24 May 2016 15:29:45 GMT', 'transfer-encoding': 'chunked', 'server': 'AmazonS3', 'connection': 'close'}
[2016-05-24 12:30:04,801: DEBUG/MainProcess] Response body:
b'<?xml version="1.0" encoding="UTF-8"?>\n<Error><Code>RequestTimeout</Code><Message>Your socket connection to the server was not read from or written to within the timeout period. Idle connections will be closed.</Message><RequestId>2F527FED26157010</RequestId><HostId>VwcX2j4FmBoE2oUyH+08V0bh+ZW74vGOF0IkSP2h5KUp07ANcw8qOwexZyv5yupmaXOxiyYbiCg=</HostId></Error>'
[2016-05-24 12:30:04,802: DEBUG/MainProcess] Event needs-retry.s3.PutObject: calling handler <botocore.retryhandler.RetryHandler object at 0x7fd7480694a8>
[2016-05-24 12:30:04,802: DEBUG/MainProcess] retry needed: matching HTTP status and error code seen: 400, RequestTimeout

如有任何想法,我们将不胜感激! :-D

这与 Celery 无关。

由 Eventlet 中的已知错误引起,由 HTTPS 连接中的 PUT-Except-100 序列触发:https://github.com/eventlet/eventlet/issues/315

抱歉,尚未准备好适当的修复程序。

临时解决方法(也在 https://github.com/eventlet/eventlet/issues/313 中发布):

import eventlet.green.ssl

def _green_ssl_recv_into (self, buffer, nbytes=None, flags=0):
    if self._sslobj:
        if flags != 0:
            raise ValueError(
                "non-zero flags not allowed in calls to recv_into() on %s" %
                self.__class__)
        if nbytes is None:
            if buffer:
                nbytes = len(buffer)
            else:
                nbytes = 1024
        read = self.read(nbytes, buffer)
        return read
    else:
        while True:
            try:
                return eventlet.green.ssl.socket.recv_into(self, buffer, nbytes, flags)
            except eventlet.green.ssl.orig_socket.error as e:
                if self.act_non_blocking:
                    raise
                erno = eventlet.green.ssl.get_errno(e)
                if erno in eventlet.green.ssl.greenio.SOCKET_BLOCKING:
                    try:
                        eventlet.green.ssl.trampoline(
                            self, read=True,
                            timeout=self.gettimeout(), timeout_exc=eventlet.green.ssl.timeout_exc('timed out'))
                    except eventlet.green.ssl.IOClosed:
                        return b''
                elif erno in eventlet.green.ssl.greenio.SOCKET_CLOSED:
                    return b''
                raise

eventlet.green.ssl.GreenSSLSocket.recv_into = _green_ssl_recv_into