AWS - "The request signature we calculated does not match the signature you provided" 与 IAM 用户和来自 python 的请求

AWS - "The request signature we calculated does not match the signature you provided" with IAM user and requests from python

我在 AWS API 网关中有一个 API 使用 POST 方法,它连接到 AWS Lambda 进行一些操作。我需要在一个只安装标准库的环境中从 Python-3.8 调用此代码。所以,我发现 these on AWS Docs 可以签署对 API 网关的请求。

这是我尝试从博客和 Postman 复制内容后的结果。

# everything below is from
# https://docs.aws.amazon.com/code-samples/latest/catalog/python-signv4-v4-signing-get-post.py.html

from datetime import datetime
import hashlib
import hmac
import json
import os
import requests
from requests.api import head

# initial parameters
method = "POST"
service = "execute-api"
region = "ap-south-1"
host = f"<my-api-id>.{service}.{region}.amazonaws.com"
endpoint = f"https://{host}/experiment-stage"


# set additional headers
content_type = "application/json"

# Read AWS Credentials from environment variables.
access_key = os.getenv('AWS_ACCESS_KEY_API_INVOKE')
secret_key = os.getenv('AWS_SECRTE_ACCESS_KEY_API_INVOKE')

# Create a date for headers and the credential string
t = datetime.utcnow()
amz_date = t.strftime("%Y%m%dT%H%M%SZ")
date_stamp = t.strftime("%Y%m%d")  # Date w/o time, used in credential scope

# start creating canonical string.
canonical_uri = "/"

# request_parameters = json.dumps("{\"even\" : [1, 2, 3, 4], \"odd\" : [1, 2, 3, 4]}")
request_parameters = json.dumps("{\"even\" : [1,2,3,4], \"odd\" : [1,2,3,4]}")

canonical_querystring = ""

# Step 4: Create the canonical headers. Header names must be trimmed
canonical_headers = f"content-type:{content_type}\nhost:{host}\nx-amz-date:{amz_date}\n"

signed_headers = "host;x-amz-content-sha256;x-amz-date"


def sign(key, msg):
    return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()


def getSignatureKey(key, date_stamp, regionName, serviceName):
    kDate = sign(("AWS4" + key).encode("utf-8"), date_stamp)
    kRegion = sign(kDate, regionName)
    kService = sign(kRegion, serviceName)
    kSigning = sign(kService, "aws4_request")
    return kSigning


# Step 6: Create payload hash. In this example, the payload (body of
# the request) contains the request parameters.
payload_hash = hashlib.sha256(request_parameters.encode('utf-8')).hexdigest()

# Step 7: Combine elements to create canonical request
# canonical_request = method + '\n' + canonical_uri + '\n' + canonical_querystring + '\n' + canonical_headers + '\n' + signed_headers + '\n' + payload_hash
canonical_request = f"{method}\n{canonical_uri}\n{canonical_querystring}\n{canonical_headers}\n{signed_headers}\n{payload_hash}"

# ************* TASK 2: CREATE THE STRING TO SIGN*************
algorithm = "AWS4-HMAC-SHA256"
credential_scope = f"{date_stamp}/{region}/{service}/aws4_request"

# now sing the string
string_to_sign = f"{algorithm}\n{amz_date}\n{credential_scope}\n{hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()}\n"


# ************* TASK 3: CALCULATE THE SIGNATURE *************
# Create the signing key using the function defined above.
signing_key = getSignatureKey(secret_key, date_stamp, region, service)

# Sign the string_to_sign using the signing_key
signature = hmac.new(signing_key, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest()


# ************* TASK 4: ADD SIGNING INFORMATION TO THE REQUEST *************
authorization_header = f"{algorithm} Credential={access_key}/{credential_scope}, SignedHeaders={signed_headers}, Signature={signature}"

headers = {"X-Amz-Content-Sha256": payload_hash,
           "X-Amz-Date": amz_date,
           "Content-Type": "application/json",
           "Host": host,
           "Authorization": authorization_header
           }

print(headers)

response = requests.request(method=method, url=endpoint, headers=headers, data=request_parameters)

print(f'Response code: {response.status_code}; Seconds Elapsed: {response.elapsed.total_seconds()};')
print(response.text)

我得到的错误是:

$ /bin/python3 /home/naveen/.../with_requests_aws.py
{'X-Amz-Content-Sha256': '64cfb8d65af84614135d2de33dd26751ef2384f46579355de226c5062b2537e1', 'X-Amz-Date': '20210912T171926Z', 'Content-Type': 'application/json', 'Host': '<my-api-id>.execute-api.ap-south-1.amazonaws.com', 'Authorization': 'AWS4-HMAC-SHA256 Credential=<my-API-key-ID>/20210912/ap-south-1/execute-api/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=6a6fdb5bcf40a17ea6377b497ee28e9c8252899914b1348e87ab9876e6140b6d'}
Response code: 403; Seconds Elapsed: 0.530462;
{"message":"The request signature we calculated does not match the signature you provided. Check your AWS Secret Access Key and signing method. Consult the service documentation for details.\n\nThe Canonical String for this request should have been\n'POST\n/experiment-stage\n\nhost:<my-api-id>.execute-api.ap-south-1.amazonaws.com\nx-amz-content-sha256:64cfb8d65af84614135d2de33dd26751ef2384f46579355de226c5062b2537e1\nx-amz-date:20210912T171926Z\n\nhost;x-amz-content-sha256;x-amz-date\n64cfb8d65af84614135d2de33dd26751ef2384f46579355de226c5062b2537e1'\n\nThe String-to-Sign should have been\n'AWS4-HMAC-SHA256\n20210912T171926Z\n20210912/ap-south-1/execute-api/aws4_request\na48c1087cbce204f6f960dd7cebc064b8bd89022fba6644ccf4638eec4d65091'\n"}

当我在授权选项卡中添加访问密钥凭据时,它的工作方式与 Postman 预期的一样:

这对我有用,特别是 AWS API HTTP POST 请求网关:

import requests
import json
from urllib.parse import quote, urlparse
import hmac
import hashlib
from datetime import datetime

def get_canonical_path(url):
    """
    Create canonical URI--the part of the URI from domain to query
    string (use '/' if no path)
    """
    parsedurl = urlparse(url)

    # safe chars adapted from boto's use of urllib.parse.quote
    # https://github.com/boto/boto/blob/d9e5cfe900e1a58717e393c76a6e3580305f217a/boto/auth.py#L393
    return quote(parsedurl.path if parsedurl.path else '/', safe='/-_.~')


def get_canonical_querystring(url):
    """
    Create the canonical query string. According to AWS, by the
    end of this function our query string values must
    be URL-encoded (space=%20) and the parameters must be sorted
    by name.
    This method assumes that the query params in `r` are *already*
    url encoded.  If they are not url encoded by the time they make
    it to this function, AWS may complain that the signature for your
    request is incorrect.
    It appears elasticsearc-py url encodes query paramaters on its own:
        https://github.com/elastic/elasticsearch-py/blob/5dfd6985e5d32ea353d2b37d01c2521b2089ac2b/elasticsearch/connection/http_requests.py#L64
    If you are using a different client than elasticsearch-py, it
    will be your responsibility to urleconde your query params before
    this method is called.
    """
    canonical_querystring = ''

    parsedurl = urlparse(url)
    querystring_sorted = '&'.join(sorted(parsedurl.query.split('&')))

    for query_param in querystring_sorted.split('&'):
        key_val_split = query_param.split('=', 1)

        key = key_val_split[0]
        if len(key_val_split) > 1:
            val = key_val_split[1]
        else:
            val = ''

        if key:
            if canonical_querystring:
                canonical_querystring += "&"
            canonical_querystring += u'='.join([key, val])

    return canonical_querystring


def sign(key, msg):
    """
    Copied from https://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html
    """
    return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()


def getSignatureKey(key, dateStamp, regionName, serviceName):
    """
    Copied from https://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html
    """
    kDate = sign(('AWS4' + key).encode('utf-8'), dateStamp)
    kRegion = sign(kDate, regionName)
    kService = sign(kRegion, serviceName)
    kSigning = sign(kService, 'aws4_request')
    return kSigning


def get_headers(aws_host: str, url:str, body: str, aws_region:str, service:str, aws_access_key:str, aws_secret_access_key:str):
    # Create a date for headers and the credential string
    t = datetime.utcnow()
    amzdate = t.strftime('%Y%m%dT%H%M%SZ')
    datestamp = t.strftime('%Y%m%d')  # Date w/o time for credential_scope

    canonical_uri = get_canonical_path(url)

    canonical_querystring = get_canonical_querystring(url)

    # Create the canonical headers and signed headers. Header names
    # and value must be trimmed and lowercase, and sorted in ASCII order.
    # Note that there is a trailing \n.
    canonical_headers = ('host:' + aws_host + '\n' +
                            'x-amz-date:' + amzdate + '\n')

    # Create the list of signed headers. This lists the headers
    # in the canonical_headers list, delimited with ";" and in alpha order.
    # Note: The request can include any headers; canonical_headers and
    # signed_headers lists those that you want to be included in the
    # hash of the request. "Host" and "x-amz-date" are always required.
    signed_headers = 'host;x-amz-date'

    payload_hash = hashlib.sha256(body).hexdigest()

    # Combine elements to create create canonical request
    canonical_request = ('POST' + '\n' + canonical_uri + '\n' +
                            canonical_querystring + '\n' + canonical_headers +
                            '\n' + signed_headers + '\n' + payload_hash)

    # Match the algorithm to the hashing algorithm you use, either SHA-1 or
    # SHA-256 (recommended)
    algorithm = 'AWS4-HMAC-SHA256'
    credential_scope = (datestamp + '/' + aws_region + '/' +
                        service + '/' + 'aws4_request')
    string_to_sign = (algorithm + '\n' + amzdate + '\n' + credential_scope +
                        '\n' + hashlib.sha256(canonical_request.encode('utf-8')).hexdigest())

    # Create the signing key using the function defined above.
    signing_key = getSignatureKey(aws_secret_access_key, datestamp, aws_region, service)

    # Sign the string_to_sign using the signing_key
    string_to_sign_utf8 = string_to_sign.encode('utf-8')
    signature = hmac.new(signing_key,
                            string_to_sign_utf8,
                            hashlib.sha256).hexdigest()

    # The signing information can be either in a query string value or in
    # a header named Authorization. This code shows how to use a header.
    # Create authorization header and add to request headers
    authorization_header = (algorithm + ' ' + 'Credential=' + aws_access_key +
                            '/' + credential_scope + ', ' + 'SignedHeaders=' +
                            signed_headers + ', ' + 'Signature=' + signature)

    headers = {
        'Authorization': authorization_header,
        'x-amz-date': amzdate,
        'x-amz-content-sha256': payload_hash,
        'Content-Type': 'application/json'
    }

    return headers



aws_host = '<api-id>.execute-api.ap-south-1.amazonaws.com'
url = 'https://<api-id>.execute-api.ap-south-1.amazonaws.com/<my-endpoint>'

body = json.dumps({"key1":[<values>], "key2":[<values>]})
body = body.encode('utf-8')

aws_region = 'ap-south-1'
service = 'execute-api'

aws_access_key = '<access-key>'
aws_secret_access_key = '<secret-access-key>'

headers = get_headers(aws_host=aws_host, url=url, body=body, service=service,
                      aws_region=aws_region, aws_access_key=aws_access_key, aws_secret_access_key=aws_secret_access_key)

print(headers)

response = requests.post(url=url, headers=headers, data=body)

print(response.status_code, response.text)