无法识别将 JSON 数据从 Kinesis Firehose 发送到 elasticsearch 的实际值

Real value not recognized sending JSON data from Kinesis Firehose to elasticsearch

我在 Kibana 中遇到问题,字段 value 在以下几行中进行了解释。我会尽力解释情况。

我将 dynamoDB 流发送到 Lambda,然后发送到 Kenesis Firehouse,最后从 Firehose 发送到 Elasticsearch。我正在使用 Kibana 来可视化数据,这就是我遇到的问题。

假设我正在将此 JSON 发送到 DynamoDB:

{
    "id": "identificator",
    "timestamp": "2017-05-09T06:38:00.337Z",
    "value": 33,
    "units": "units",
    "description": "This is the description",
    "machine": {
        "brand": "brand",
        "application": "application"
    }
}

在 Lambda 中,我收到以下信息:

{
    "data": {
        "M": {
            "machine": {
                "M": {
                    "application": {
                        "S": "application"
                    },
                    "brand": {
                        "S": "band"
                    }
                }
            },
            "description": {
                "S": "This is the description"
            },
            "id": {
                "S": "identificator"
            },
            "units": {
                "S": "units"
            },
            "value": {
                "N": "33"
            },
            "_msgid": {
                "S": "85209b75.f51ee8"
            },
            "timestamp": {
                "S": "2017-05-09T06:38:00.337Z"
            }
        }
    },
    "id": {
        "S": "85209b75.f51ee8"
    }
}

如果我将最后一个 JSON 转发给 Kinesis Firehose,当我在 Kibana 中配置索引模式时,它会自动识别 "timestamp"(这很棒)。这里的问题是,字段 "value" 就像一个字符串,无法识别。

我尝试修改 JSON 然后再次将其发送到 Firehose 但 Kibana 无法识别 "timestamp":

{
    "data": {
        "machine": {
            "application": "application",
            "brand": "brand"
        },
        "description": "This is the description",
        "id": "identificator",
        "units": "KWh",
        "value": 33,
        "_msgid": "85209b75.f51ee8",
        "timestamp": "2017-05-09T06:38:00.337Z"
    },
    "id": "85209b75.f51ee8"
}

我想知道我如何发送这些数据并且 Kibana 识别 "timestamp" 和 "value" 字段。

这是我在 lambda 中使用的代码示例:

var AWS = require('aws-sdk');
var unmarshalJson = require('dynamodb-marshaler').unmarshalJson;

var firehose = new AWS.Firehose();

exports.lambda_handler = function(event, context) {

    var record = JSON.stringify(event.Records[0].dynamodb.NewImage);

    console.log("[INFO]:"+JSON.stringify(event.Records[0].dynamodb.NewImage));

    var params = {
        DeliveryStreamName: 'DeliveryStreamName',

        Record:{ 
            Data: record
        }
    };
    firehose.putRecord(params, function(err, data) {

        if (err) console.log(err, err.stack); // an error occurred
        else     console.log(JSON.stringify(data));           // successful response

        context.done();
    });
};

我自己解决了创建索引映射的问题,而不是让 Kinesis Firehose 创建它。并将 "timestamp" 属性声明为 { "type" : "date" },将 "value" 属性声明为 { "type" : "float" }

例如对于这种类型 JSON:

{
    "data": {
        "timestamp": "2017-05-09T11:30:41.484Z",
        "tag": "tag",
        "value": 33,
        "units": "units",
        "type": "type",
        "machine":{
            "name": "name",
            "type": "type",
            "company": "company"
        }
    },
    "id": "85209b75.f51ee8"
}

我手动创建了以下 elasticsearch 索引和映射:

PUT /index
{
    "settings" : {
        "number_of_shards" : 2
    },
    "mappings" : {
        "type" : {
            "properties" : {
                "data" : {
                    "properties" : {
                        "machine":{
                            "properties": {
                                "name": { "type" : "text" },
                                "type": { "type" : "text" },
                                "company": { "type" : "text" }
                            }
                        },
                        "timestamp": { "type" : "date" },
                        "tag" : { "type" : "text" },
                        "value": { "type" : "float" },
                        "description":  { "type" : "text" },
                        "units":  { "type" : "text" },
                        "type" : { "type" : "text" },
                        "_msgid":  { "type" : "text" }
                    }
                },
                "id":  { "type" : "text" }      
            }
        }
    }
}

所以,要解决这个问题,我认为更好的解决方案是在 lambda 中你必须检查索引映射是否存在,如果不存在则由你自己创建。