通过设置 API 网关为 Amazon Kinesis 调用 REST API

Call REST API for Amazon Kinesis with Setting up API Gateway

我正在尝试发送 HTTP Post 请求以将记录放入 Amazon Kinesis Stream。有几种方法(Kinesis 客户端、KPL、将 AWS 网关设置为 Kinesis 代理)。

我看到了这篇关于 Kinesis PutRecord 的文档 API http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html

POST / HTTP/1.1
Host: kinesis.<region>.<domain>
Content-Length: <PayloadSizeBytes>
User-Agent: <UserAgentString>
Content-Type: application/x-amz-json-1.1
Authorization: <AuthParams>
Connection: Keep-Alive 
X-Amz-Date: <Date>
X-Amz-Target: Kinesis_20131202.PutRecord
{
  "StreamName": "exampleStreamName",
  "Data": "XzxkYXRhPl8x",
  "PartitionKey": "partitionKey"
}

是否可以将上述 HTTP POST 请求发送到 PutRecord 而无需设置 Amazon API 网关,如此 link 中所述: http://docs.aws.amazon.com/apigateway/latest/developerguide/use-custom-authorizer.html#call-api-with-api-gateway-custom-authorization

KPL 和 Kinesis 客户端必须以某种方式在内部使用 HTTP POST 到 PutRecord,因此必须有办法做到这一点。不幸的是,我在网上找不到任何资源。

import sys, os, base64, datetime, hashlib, hmac
import requests # pip install requests
# ************* REQUEST VALUES *************
method = 'POST'
service = 'kinesis'
host = 'kinesis.us-east-1.amazonaws.com'
region = 'us-east-1'
endpoint = 'https://kinesis.us-east-1.amazonaws.com'
content_type = 'application/x-amz-json-1.1'

amz_target = 'Kinesis_20131202.PutRecord'
request_parameters = '{'
request_parameters += '"StreamName": "<StreamName>",'
request_parameters += '"Data": "' + base64.b64encode(<DATA>) + '",'
request_parameters += '"PartitionKey": "<PartitionKey>"'
request_parameters += '}'

# http://docs.aws.amazon.com/general/latest/gr/signature-v4-examples.html#signature-v4-examples-python
def sign(key, msg):
    return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
def getSignatureKey(key, date_stamp, regionName, serviceName):
    kDate = sign(('AWS4' + key).encode('utf-8'), date_stamp)
    kRegion = sign(kDate, regionName)
    kService = sign(kRegion, serviceName)
    kSigning = sign(kService, 'aws4_request')
    return kSigning

access_key = '<ACCESS KEY>'
secret_key = '<SECRET KEY>'
if access_key is None or secret_key is None:
    print 'No access key is available.'
    sys.exit()
# Create a date for headers and the credential string
t = datetime.datetime.utcnow()
amz_date = t.strftime('%Y%m%dT%H%M%SZ')
date_stamp = t.strftime('%Y%m%d') # Date w/o time, used in credential scope


canonical_uri = '/'

canonical_querystring = ''

canonical_headers = 'content-type:' + content_type + '\n' + 'host:' + host + '\n' + 'x-amz-date:' + amz_date + '\n' + 'x-amz-target:' + amz_target + '\n'

signed_headers = 'content-type;host;x-amz-date;x-amz-target'

payload_hash = hashlib.sha256(request_parameters).hexdigest()

canonical_request = method + '\n' + canonical_uri + '\n' + canonical_querystring + '\n' + canonical_headers + '\n' + signed_headers + '\n' + payload_hash

algorithm = 'AWS4-HMAC-SHA256'
credential_scope = date_stamp + '/' + region + '/' + service + '/' + 'aws4_request'
string_to_sign = algorithm + '\n' +  amz_date + '\n' +  credential_scope + '\n' +  hashlib.sha256(canonical_request).hexdigest()

signing_key = getSignatureKey(secret_key, date_stamp, region, service)

signature = hmac.new(signing_key, (string_to_sign).encode('utf-8'), hashlib.sha256).hexdigest()

authorization_header = algorithm + ' ' + 'Credential=' + access_key + '/' + credential_scope + ', ' +  'SignedHeaders=' + signed_headers + ', ' + 'Signature=' + signature
print authorization_header;


headers = {'Content-Type':content_type,
           'X-Amz-Date':amz_date,
           'X-Amz-Target':amz_target,
           'Authorization':authorization_header}
# ************* SEND THE REQUEST *************
print '\nBEGIN REQUEST++++++++++++++++++++++++++++++++++++'
print 'Request URL = ' + endpoint
r = requests.post(endpoint, data=request_parameters, headers=headers)
print '\nRESPONSE++++++++++++++++++++++++++++++++++++'
print 'Response code: %d\n' % r.status_code
print r.text

只需更改代码中标有 <> 的必需参数,您就可以通过 HTTP 访问端点。这仅适用于您出于某种原因无法使用 aws-sdk 或 aws-cli 的情况。

以上代码在python中。

要进行转换,请检查以下内容 link。 https://github.com/danharper/hmac-examples

import sys, os, base64, datetime, hashlib, hmac
import requests # pip install requests
# ************* REQUEST VALUES *************
method = 'POST'
service = 'kinesis'
host = 'kinesis.cn-north-1.amazonaws.com.cn'
region = 'cn-north-1'
endpoint = 'https://kinesis.cn-north-1.amazonaws.com.cn'
content_type = 'application/x-amz-json-1.1'

amz_target = 'Kinesis_20131202.PutRecord'
request_parameters = '{'
request_parameters += '"StreamName": "cummins_kinesis_test",'
request_parameters += '"Data": "euoioeuiuoeieoieio",'
request_parameters += '"PartitionKey": "test"'
request_parameters += '}'

# http://docs.aws.amazon.com/general/latest/gr/signature-v4-examples.html#signature-v4-examples-python
def sign(key, msg):
    return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
def getSignatureKey(key, date_stamp, regionName, serviceName):
    kDate = sign(('AWS4' + key).encode('utf-8'), date_stamp)
    kRegion = sign(kDate, regionName)
    kService = sign(kRegion, serviceName)
    kSigning = sign(kService, 'aws4_request')
    return kSigning

access_key = 'xxxxxxxxxxxxxxxxxxx'
secret_key = 'xxxxxxxxxxxxxxxxxxx'
if access_key is None or secret_key is None:
    print( 'No access key is available.')
    sys.exit()
# Create a date for headers and the credential string
t = datetime.datetime.utcnow()
amz_date = t.strftime('%Y%m%dT%H%M%SZ')
date_stamp = t.strftime('%Y%m%d') # Date w/o time, used in credential scope


canonical_uri = '/'

canonical_querystring = ''

canonical_headers = 'content-type:' + content_type + '\n' + 'host:' + host + '\n' + 'x-amz-date:' + amz_date + '\n' + 'x-amz-target:' + amz_target + '\n'

signed_headers = 'content-type;host;x-amz-date;x-amz-target'

payload_hash = hashlib.sha256(request_parameters.encode('utf-8')).hexdigest()

canonical_request = method + '\n' + canonical_uri + '\n' + canonical_querystring + '\n' + canonical_headers + '\n' + signed_headers + '\n' + payload_hash

algorithm = 'AWS4-HMAC-SHA256'
credential_scope = date_stamp + '/' + region + '/' + service + '/' + 'aws4_request'
string_to_sign = algorithm + '\n' +  amz_date + '\n' +  credential_scope + '\n' +  hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()

signing_key = getSignatureKey(secret_key, date_stamp, region, service)

signature = hmac.new(signing_key, (string_to_sign).encode('utf-8'), hashlib.sha256).hexdigest()

authorization_header = algorithm + ' ' + 'Credential=' + access_key + '/' + credential_scope + ', ' +  'SignedHeaders=' + signed_headers + ', ' + 'Signature=' + signature
print(authorization_header);


headers = {'Content-Type':content_type,
           'X-Amz-Date':amz_date,
           'X-Amz-Target':amz_target,
           'Authorization':authorization_header}
# ************* SEND THE REQUEST *************
print( '\nBEGIN REQUEST++++++++++++++++++++++++++++++++++++')
print('Request URL = ' + endpoint)
r = requests.post(endpoint, data=request_parameters, headers=headers)
print( '\nRESPONSE++++++++++++++++++++++++++++++++++++')
print( 'Response code: %d\n' % r.status_code)
print( r.text )