理论上是否可以通过单个 http POST 请求在 Kinesis 中使用 `putRecord`?

Is it theoretically possible to use `putRecord` in Kinesis via singular http POST request?

我在使用 AWS Kinesis 时遇到了一些问题,因为我设置了流并且我想使用标准的 http POST 请求在我的流上调用 Kinesis PutRecord 调用。我这样做是因为 bundle-size 我的结果 javascript 应用程序很重要,我宁愿不导入 aws-sdk 来完成应该(在纸面上)可能的事情。

如你所知,我查看了 并且它...有点信息性。

现在,我已经有了使用访问密钥、秘密令牌和 session 令牌对请求进行 sigv4 签名的方法。但是当我最终得到签署请求的结果并使用 in-browser fetch api 发送它时,服务坦克带有(或带有 json object引用同样的事情,取决于我的 Content-Type header,我猜)作为结果。

这是我正在使用的代码

// There is a global function "sign" that does sigv4 signing
// ...

var payload = {
    Data: { task: "Get something working in kinesis" },
    PartitionKey: "1",
    StreamName: "MyKinesisStream"
}

var credentials =  {
    "accessKeyId": "<access.key>",
    "secretAccessKey": "<secret.key>",
    "sessionToken": "<session.token>",
    "expiration": 1528922673000
}

function signer({ url, method, data }) {
    // Wrapping with URL for piecemeal picking of parsed pieces
    const parsed = new URL(url);

    const [ service, region ] = parsed.host.split(".");

    const signed = sign({
        method,
        service,
        region,
        url,

        // Hardcoded
        headers : {
            Host           : parsed.host,
            "Content-Type" : "application/json; charset=UTF-8",

            "X-Amz-Target" : "Kinesis_20131202.PutRecord"
        },

        body : JSON.stringify(data),
    }, credentials);

    return signed;
}

// Specify method, url, data body
var signed = signer({
    method: "POST",
    url: "https://kinesis.us-west-2.amazonaws.com",
    data : JSON.stringify(payload)
});

var request = fetch(signed.url, signed);

当我查看请求的结果时,我得到了这个:

{
    Output: { 
       __type: "com.amazon.coral.service#InternalFailure"},
       Version: "1.0" 
}

现在我不确定 Kinesis 是否真的失败了,或者我的输入是否格式错误?

这是签名请求的样子

{
    "method": "POST",
    "service": "kinesis",
    "region": "us-west-2",
    "url": "https://kinesis.us-west-2.amazonaws.com",
    "headers": {
        "Host": "kinesis.us-west-2.amazonaws.com",
        "Content-Type": "application/json; charset=UTF-8",
        "X-Amz-Target": "Kinesis_20131202.PutRecord",
        "X-Amz-Date": "20180613T203123Z",
        "X-Amz-Security-Token": "<session.token>",
        "Authorization": "AWS4-HMAC-SHA256 Credential=<access.key>/20180613/us-west-2/kinesis/aws4_request, SignedHeaders=content-type;host;x-amz-target, Signature=ba20abb21763e5c8e913527c95a0c7efba590cf5ff1df3b770d4d9b945a10481"
    },
    "body": "\"{\\"Data\\":{\\"task\\":\\"Get something working in kinesis\\"},\\"PartitionKey\\":\\"1\\",\\"StreamName\\":\\"MyKinesisStream\\"}\"",
    "test": {
        "canonical": "POST\n/\n\ncontent-type:application/json; charset=UTF-8\nhost:kinesis.us-west-2.amazonaws.com\nx-amz-target:Kinesis_20131202.PutRecord\n\ncontent-type;host;x-amz-target\n508d2454044bffc25250f554c7b4c8f2e0c87c2d194676c8787867662633652a",
        "sts": "AWS4-HMAC-SHA256\n20180613T203123Z\n20180613/us-west-2/kinesis/aws4_request\n46a252f4eef52991c4a0903ab63bca86ec1aba09d4275dd8f5eb6fcc8d761211",
        "auth": "AWS4-HMAC-SHA256 Credential=<access.key>/20180613/us-west-2/kinesis/aws4_request, SignedHeaders=content-type;host;x-amz-target, Signature=ba20abb21763e5c8e913527c95a0c7efba590cf5ff1df3b770d4d9b945a10481"
    }

(测试密钥被生成签名的库使用,忽略) (body 中可能还有额外的斜杠,因为我使用 JSON.stringify 漂亮地打印了响应 object)。

我的问题:我是否遗漏了什么? Kinesis 是否需要 headers a、b 和 c 而我只生成其中两个?还是此内部错误是实际故障。我迷路了,因为回应表明我无能为力。

感谢任何帮助!

编辑:作为次要问题,我是否正确使用了 X-Amz-Target header?这 只要您访问该服务端点,您如何引用调用服务函数,不是吗?

更新:按照迈克尔的意见,我已经有所进展,但我仍然没有解决问题。这是我所做的:

我确保在我的 payload 中,我只在 Data 属性 上 运行ning JSON.stringify

我还将 Content-Type header 修改为 "Content-Type" : "application/x-amz-json-1.1" 因此,我收到了一些更有用的错误消息。

现在,我的有效负载仍然基本相同:

var payload = {
    Data: JSON.stringify({ task: "Get something working in kinesis" }),
    PartitionKey: "1",
    StreamName: "MyKinesisStream"
}

我的签名函数 body 看起来像这样:

 function signer({ url, method, data }) {
    // Wrapping with URL for piecemeal picking of parsed pieces
    const parsed = new URL(url);

    const [ service, region ] = parsed.host.split(".");

    const signed = sign({
        method,
        service,
        region,
        url,

        // Hardcoded
        headers : {
            Host           : parsed.host,
            "Content-Type" : "application/json; charset=UTF-8",

            "X-Amz-Target" : "Kinesis_20131202.PutRecord"
        },

        body : data,
    }, credentials);

    return signed;
}

所以我传递了一个部分序列化的 object(至少数据是),当我将它发送到服务时,我得到的响应是:

{"__type":"SerializationException"}

这至少 有点帮助 因为它告诉我我的输入在技术上是不正确的。但是,我已经做了一些尝试来纠正这个问题:

但我仍然遇到同样的错误。我觉得我很接近。你能发现我可能遗漏的任何东西或我没有尝试过的东西吗?我遇到了零星的 unknownoperationexceptions,但我认为现在这个序列化让我感到难过。

编辑 2:

事实证明,Kinesis 将只接受 base64 编码的字符串。这可能是 aws-sdk 提供的一个好处,但基本上它所需要的只是有效负载中的 Data: btoa(JSON.stringify({ task: "data"})) 来让它工作

虽然我不确定这是唯一的问题,但您发送的请求正文似乎包含错误序列化(双重编码)的负载。

var obj = { foo: 'bar'};

JSON.stringify(obj) return一个字符串...

'{"foo": "bar"}' // the ' are not part of the string, I'm using them to illustrate that this is a thing of type string.

...当用 JSON 解析器解析时,这个 return 是一个对象 .

{ foo: 'bar' }

但是,JSON.stringify(JSON.stringify(obj)) return是一个不同的字符串...

'"{\"foo\": \"bar\"}"'

...但是在解析时,这个 return 是一个字符串

 '{"foo": "bar"}'

服务端点期望解析正文并获取对象,而不是字符串...因此,解析请求正文(从服务的角度来看)return 不正确类型。该错误似乎是服务未能在非常低的级别解析您的请求。

在您的代码中,body: JSON.stringify(data) 应该只是 body: data,因为之前您已经使用 data: JSON.stringify(payload).

创建了一个 JSON 对象

正如所写,您实际上是将 body 设置为 JSON.stringify(JSON.stringify(payload)).

不确定您是否曾经想过这个问题,但是在搜索如何执行此操作时 Google 上会弹出这个问题。我认为您缺少的一件是 Record Data 字段必须是 base64 编码的。这是将执行此操作的一段 NodeJS 代码(使用 PutRecords)。

如果有人问,为什么不直接使用 SDK?我目前必须从集群中流式传输数据,由于其他依赖关系,该集群无法更新为 SDK 所需的 NodeJS 版本。耶。

const https = require('https')
const aws4  = require('aws4')
const request = function(o) { https.request(o, function(res) { res.pipe(process.stdout) }).end(o.body || '') }

const _publish_kinesis = function(logs) {
    const kin_logs = logs.map(function (l) {
        let blob = JSON.stringify(l) + '\n'
        let buff = Buffer.from(blob, 'binary');
        let base64data = buff.toString('base64');

        return {
            Data: base64data,
            PartitionKey: '0000'
        }
    })

    while(kin_logs.length > 0) {
        let data = JSON.stringify({
            Records: kin_logs.splice(0,250),
            StreamName: 'your-streamname'
        })

        let _request = aws4.sign({
            hostname: 'kinesis.us-west-2.amazonaws.com',
            method: 'POST',
            body: data,
            path: '/?Action=PutRecords',
            headers: {
                'Content-Type': 'application/x-amz-json-1.1',
                'X-Amz-Target': 'Kinesis_20131202.PutRecords'
            },
         }, {
            secretAccessKey: "****",
            accessKeyId: "****"
           // sessionToken: "<your-session-token>"
         })

        request(_request)
    }
}

var logs = [{
  'timeStamp': new Date().toISOString(),
  'value': 'test02',
},{
  'timeStamp': new Date().toISOString(),
  'value': 'test01',
}]
_publish_kinesis(logs)