使用 Azure Functions 处理来自 IoT 中心的传入 JSON 消息

Processing incoming JSON messages from IoT Hub with Azure Functions

我通过 IoT 中心从 IoT 设备接收消息,并将它们存储在 Cosmos 数据库中。流分析工作正常,但我想使用 Azure Functions。

我想在将传入消息存储到我的 Cosmos DB 之前稍微处理一下。原始消息看起来像这样:

{
    "id": "12345",
    "time": "2019-12-05T07:21:37.000+0000",
    "sensors": [
        {
            "type": "TEMPERATURE",
            "value": 23.30286376953125,
            "unit": "C"
        },
        {
            "type": "HUMIDITY",
            "value": 29.686492919921875,
            "unit": "RH"
        },
        {
            "type": "CELL_ID",
            "value": 56789,
            "unit": ""
        },
        {
            "type": "RSSI_LEVEL",
            "value": -86,
            "unit": "dBm"
        }
    ]
}

我想在 Cosmos 中存储的内容应该如下所示:

{
    id: "2019-12-05T07:21:37.000+0000",
    {
        "deviceId": "12345",
        "time": "2019-12-05T07:21:37.000+0000",
        "type": "TEMPERATURE",
        "value": "23.30286376953125",
        "unit": "C"
    },
    {
        "deviceId": "12345",
        "time": "2019-12-05T07:21:37.000+0000",
        "type": "HUMIDITY",
        "value": "29.686492919921875",
        "unit": "RH"
    },
    ...
}

我将 id 设置为等于 time 的原因是因为我还没有弄清楚如何从 Azure 流分析复制 Document Id 功能。我正在尝试确保对 Cosmos 的写入操作不会覆盖现有文件。

目前,我的 JavaScript 代码如下所示(我不知道 JavaScript,我只是在改编我在 Internet 上找到的代码):

module.exports = function (context, IoTHubMessages) {
    context.log(`JavaScript eventhub trigger function called for message array: ${IoTHubMessages}`);

    var payload = "";

    IoTHubMessages.forEach(message => {
        context.log(`Processed message: ${message}`);

        payload += { "id": message.time };
        message.sensors.forEach(observation => {
            if (observation.type != "CELL_ID" && observation.type != "RSSI_LEVEL") {
                payload += {
                    "deviceId": message.id,
                    "time": message.time,
                    "type": observation.type,
                    "value": observation.value,
                    "unit": observation.unit
                };
            };
        });
    });

    context.bindings.outputDocument = payload;
    context.done();
};

调试上述代码需要很长时间,因为我无法控制 IoT 设备的采样率。我想知道是否有人可以帮助我获得上述代码块以产生所需的结果。

{
    id: "2019-12-05T07:21:37.000+0000",
    {
        "deviceId": "12345",
        "time": "2019-12-05T07:21:37.000+0000",
        "type": "TEMPERATURE",
        "value": "23.30286376953125",
        "unit": "C"
    },
    {
        "deviceId": "12345",
        "time": "2019-12-05T07:21:37.000+0000",
        "type": "HUMIDITY",
        "value": "29.686492919921875",
        "unit": "RH"
    },
    ...
}

无效 JSON,您缺少设备信息的 属性 名称。这可以更改为:

{
    id: "2019-12-05T07:21:37.000+0000",
    "deviceInformation": [
    {
        "deviceId": "12345",
        "time": "2019-12-05T07:21:37.000+0000",
        "type": "TEMPERATURE",
        "value": "23.30286376953125",
        "unit": "C"
    },
    {
        "deviceId": "12345",
        "time": "2019-12-05T07:21:37.000+0000",
        "type": "HUMIDITY",
        "value": "29.686492919921875",
        "unit": "RH"
    },
    ...
    ]
}

至于代码,如果您创建一个对象而不是连接字符串可能会更容易,例如:

module.exports = function (context, IoTHubMessages) {
    context.log(`JavaScript eventhub trigger function called for message array: ${IoTHubMessages}`);

    var payload = [];
    IoTHubMessages.forEach(message => {
        context.log(`Processed message: ${message}`);

        var messageToStore  = { id: message.time, deviceInformation: [] };
        message.sensors.forEach(observation => {
            if (observation.type != "CELL_ID" && observation.type != "RSSI_LEVEL") {
                messageToStore.deviceInformation.push({
                    "deviceId": message.id,
                    "time": message.time,
                    "type": observation.type,
                    "value": observation.value,
                    "unit": observation.unit
                });
            };
        });

        payload.push(messageToStore);
    });

    context.bindings.outputDocument = payload;
    context.done();
};