Kinesis Producer 中的数据损坏

Data corruption in Kinesis Producer

我正在使用运动生成器在 AWS Cloudwatch 中插入一些分析数据。 这是整个数据流: 1. Kinesis 生产者(使用 java SDK)-> Kinesis Stream -> AWS lambda 函数 -> AWS Cloudwatch

Kinesis 中使用的生产者版本:

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-kinesis-producer</artifactId>
    <version>0.13.1</version>
</dependency>

使用以下代码将我的分析数据推送到生产者后,

kinesisProducer.addUserRecord(stream, "101", ByteBuffer.wrap(requestString.getBytes()));

正在发送给生产者的数据:

{
    "version": null,
    "namespace": "namespace",
    "metricdata": [
        {
            "name": "system",
            "unit": "Megabytes",
            "value": 879,
            "timestamp": 1590233414481,
            "dimensions": [
                {
                    "name": "systemName",
                    "value": "ramTotal"
                }
            ],
            "type": "gauge"
        }]
}

我才知道,数据是由kinesis producer转换成base64格式的。因此 像这样解码后在 lambda 函数中接收到的数据 -

const payload = Buffer.from(record.kinesis.data, 'base64').toString('ascii');

payload就这样来了

s   B
101fGaG
{
    "version": null,
    "namespace": "`$\u0017`$>`[=14=]o?=o?=`$!`$\u0010`$*",
    "metricdata": [
        {
            "name": "system",
            "unit": "Megabytes",
            "value": 879,
            "timestamp": 1590233414481,
            "dimensions": [
                {
                    "name": "systemName",
                    "value": "ramTotal"
                }
            ],
            "type": "gauge"
        }]
}

谁能帮我解决这个问题。

Kinesis Producer 将小记录聚合成最大 1MB 的较大记录。 Kinesis 消费者必须在处理前对记录进行解聚。

reference link

de-aggregation library