使用 Azure Functions 处理来自 IoT 中心的传入 JSON 消息
Processing incoming JSON messages from IoT Hub with Azure Functions
我通过 IoT 中心从 IoT 设备接收消息,并将它们存储在 Cosmos 数据库中。流分析工作正常,但我想使用 Azure Functions。
我想在将传入消息存储到我的 Cosmos DB 之前稍微处理一下。原始消息看起来像这样:
{
"id": "12345",
"time": "2019-12-05T07:21:37.000+0000",
"sensors": [
{
"type": "TEMPERATURE",
"value": 23.30286376953125,
"unit": "C"
},
{
"type": "HUMIDITY",
"value": 29.686492919921875,
"unit": "RH"
},
{
"type": "CELL_ID",
"value": 56789,
"unit": ""
},
{
"type": "RSSI_LEVEL",
"value": -86,
"unit": "dBm"
}
]
}
我想在 Cosmos 中存储的内容应该如下所示:
{
id: "2019-12-05T07:21:37.000+0000",
{
"deviceId": "12345",
"time": "2019-12-05T07:21:37.000+0000",
"type": "TEMPERATURE",
"value": "23.30286376953125",
"unit": "C"
},
{
"deviceId": "12345",
"time": "2019-12-05T07:21:37.000+0000",
"type": "HUMIDITY",
"value": "29.686492919921875",
"unit": "RH"
},
...
}
我将 id
设置为等于 time
的原因是因为我还没有弄清楚如何从 Azure 流分析复制 Document Id
功能。我正在尝试确保对 Cosmos 的写入操作不会覆盖现有文件。
目前,我的 JavaScript 代码如下所示(我不知道 JavaScript,我只是在改编我在 Internet 上找到的代码):
module.exports = function (context, IoTHubMessages) {
context.log(`JavaScript eventhub trigger function called for message array: ${IoTHubMessages}`);
var payload = "";
IoTHubMessages.forEach(message => {
context.log(`Processed message: ${message}`);
payload += { "id": message.time };
message.sensors.forEach(observation => {
if (observation.type != "CELL_ID" && observation.type != "RSSI_LEVEL") {
payload += {
"deviceId": message.id,
"time": message.time,
"type": observation.type,
"value": observation.value,
"unit": observation.unit
};
};
});
});
context.bindings.outputDocument = payload;
context.done();
};
调试上述代码需要很长时间,因为我无法控制 IoT 设备的采样率。我想知道是否有人可以帮助我获得上述代码块以产生所需的结果。
{
id: "2019-12-05T07:21:37.000+0000",
{
"deviceId": "12345",
"time": "2019-12-05T07:21:37.000+0000",
"type": "TEMPERATURE",
"value": "23.30286376953125",
"unit": "C"
},
{
"deviceId": "12345",
"time": "2019-12-05T07:21:37.000+0000",
"type": "HUMIDITY",
"value": "29.686492919921875",
"unit": "RH"
},
...
}
无效 JSON,您缺少设备信息的 属性 名称。这可以更改为:
{
id: "2019-12-05T07:21:37.000+0000",
"deviceInformation": [
{
"deviceId": "12345",
"time": "2019-12-05T07:21:37.000+0000",
"type": "TEMPERATURE",
"value": "23.30286376953125",
"unit": "C"
},
{
"deviceId": "12345",
"time": "2019-12-05T07:21:37.000+0000",
"type": "HUMIDITY",
"value": "29.686492919921875",
"unit": "RH"
},
...
]
}
至于代码,如果您创建一个对象而不是连接字符串可能会更容易,例如:
module.exports = function (context, IoTHubMessages) {
context.log(`JavaScript eventhub trigger function called for message array: ${IoTHubMessages}`);
var payload = [];
IoTHubMessages.forEach(message => {
context.log(`Processed message: ${message}`);
var messageToStore = { id: message.time, deviceInformation: [] };
message.sensors.forEach(observation => {
if (observation.type != "CELL_ID" && observation.type != "RSSI_LEVEL") {
messageToStore.deviceInformation.push({
"deviceId": message.id,
"time": message.time,
"type": observation.type,
"value": observation.value,
"unit": observation.unit
});
};
});
payload.push(messageToStore);
});
context.bindings.outputDocument = payload;
context.done();
};
我通过 IoT 中心从 IoT 设备接收消息,并将它们存储在 Cosmos 数据库中。流分析工作正常,但我想使用 Azure Functions。
我想在将传入消息存储到我的 Cosmos DB 之前稍微处理一下。原始消息看起来像这样:
{
"id": "12345",
"time": "2019-12-05T07:21:37.000+0000",
"sensors": [
{
"type": "TEMPERATURE",
"value": 23.30286376953125,
"unit": "C"
},
{
"type": "HUMIDITY",
"value": 29.686492919921875,
"unit": "RH"
},
{
"type": "CELL_ID",
"value": 56789,
"unit": ""
},
{
"type": "RSSI_LEVEL",
"value": -86,
"unit": "dBm"
}
]
}
我想在 Cosmos 中存储的内容应该如下所示:
{
id: "2019-12-05T07:21:37.000+0000",
{
"deviceId": "12345",
"time": "2019-12-05T07:21:37.000+0000",
"type": "TEMPERATURE",
"value": "23.30286376953125",
"unit": "C"
},
{
"deviceId": "12345",
"time": "2019-12-05T07:21:37.000+0000",
"type": "HUMIDITY",
"value": "29.686492919921875",
"unit": "RH"
},
...
}
我将 id
设置为等于 time
的原因是因为我还没有弄清楚如何从 Azure 流分析复制 Document Id
功能。我正在尝试确保对 Cosmos 的写入操作不会覆盖现有文件。
目前,我的 JavaScript 代码如下所示(我不知道 JavaScript,我只是在改编我在 Internet 上找到的代码):
module.exports = function (context, IoTHubMessages) {
context.log(`JavaScript eventhub trigger function called for message array: ${IoTHubMessages}`);
var payload = "";
IoTHubMessages.forEach(message => {
context.log(`Processed message: ${message}`);
payload += { "id": message.time };
message.sensors.forEach(observation => {
if (observation.type != "CELL_ID" && observation.type != "RSSI_LEVEL") {
payload += {
"deviceId": message.id,
"time": message.time,
"type": observation.type,
"value": observation.value,
"unit": observation.unit
};
};
});
});
context.bindings.outputDocument = payload;
context.done();
};
调试上述代码需要很长时间,因为我无法控制 IoT 设备的采样率。我想知道是否有人可以帮助我获得上述代码块以产生所需的结果。
{
id: "2019-12-05T07:21:37.000+0000",
{
"deviceId": "12345",
"time": "2019-12-05T07:21:37.000+0000",
"type": "TEMPERATURE",
"value": "23.30286376953125",
"unit": "C"
},
{
"deviceId": "12345",
"time": "2019-12-05T07:21:37.000+0000",
"type": "HUMIDITY",
"value": "29.686492919921875",
"unit": "RH"
},
...
}
无效 JSON,您缺少设备信息的 属性 名称。这可以更改为:
{
id: "2019-12-05T07:21:37.000+0000",
"deviceInformation": [
{
"deviceId": "12345",
"time": "2019-12-05T07:21:37.000+0000",
"type": "TEMPERATURE",
"value": "23.30286376953125",
"unit": "C"
},
{
"deviceId": "12345",
"time": "2019-12-05T07:21:37.000+0000",
"type": "HUMIDITY",
"value": "29.686492919921875",
"unit": "RH"
},
...
]
}
至于代码,如果您创建一个对象而不是连接字符串可能会更容易,例如:
module.exports = function (context, IoTHubMessages) {
context.log(`JavaScript eventhub trigger function called for message array: ${IoTHubMessages}`);
var payload = [];
IoTHubMessages.forEach(message => {
context.log(`Processed message: ${message}`);
var messageToStore = { id: message.time, deviceInformation: [] };
message.sensors.forEach(observation => {
if (observation.type != "CELL_ID" && observation.type != "RSSI_LEVEL") {
messageToStore.deviceInformation.push({
"deviceId": message.id,
"time": message.time,
"type": observation.type,
"value": observation.value,
"unit": observation.unit
});
};
});
payload.push(messageToStore);
});
context.bindings.outputDocument = payload;
context.done();
};