如何分解 Node.js 中传入的 http 请求以单独处理 json 对象
How to break up incoming http request in Node.js to process the json object individually
我正在处理一个数据处理场景,其中在 Azure IoT 中心接收传入数据,使用 Azure 流分析处理并发布(http 触发器)到 Azure 函数:https://github.com/Azure/iotc-device-bridge 写在 Node.js.这里的目标是打破传入数据并异步处理对象。
传入的数据是 json 个对象的数组。
我试图遍历 req.body 以将其分解为不同的对象,但这没有用。
感谢大家的帮助!
函数的传入消息:
[{
"device": {
"deviceId": "my-cloud-device"
},
"measurements": {
"temp": 20.31,
"pressure": 50,
"humidity": 8.5,
"ledColor": "blue"
}
},
{
"device": {
"deviceId": "my-cloud-device"
},
"measurements": {
"temp": 20.31,
"pressure": 50,
"humidity": 8.5,
"ledColor": "blue"
}
}]
处理传入数据的代码
const request = require('request-promise-native');
const handleMessage = require('./lib/engine');
const bodyParser = require('body-parser');
const msiEndpoint = process.env.MSI_ENDPOINT;
const msiSecret = process.env.MSI_SECRET;
const parameters = {
idScope: process.env.ID_SCOPE,
primaryKeyUrl: process.env.IOTC_KEY_URL
};
let kvToken;
module.exports = async function (context, req) {
try {
await handleMessage({ ...parameters, log: context.log, getSecret: getKeyVaultSecret }, req.body.device, req.body.measurements, req.body.timestamp);
} catch (e) {
context.log('[ERROR]', e.message);
context.res = {
status: e.statusCode ? e.statusCode : 500,
body: e.message
};
}
console.log(b);
}
我期望做的是将传入消息分解为不同的对象,这些对象使用上面的代码块进行异步处理。
{
"device": {
"deviceId": "my-cloud-device"
},
"measurements": {
"temp": 20.31,
"pressure": 50,
"humidity": 8.5,
"ledColor": "blue"
}
}
{
"device": {
"deviceId": "my-cloud-device"
},
"measurements": {
"temp": 20.31,
"pressure": 50,
"humidity": 8.5,
"ledColor": "blue"
}
}
我假设 handleMessage
只针对一个对象。在这种情况下,您将不得不遍历 req.body
来访问每一个。你能分享一下你尝试过的和以前没有用过的吗?
你的代码应该是这样的
///
module.exports = async function (context, req) {
let commonArgs = { ...parameters, log: context.log, getSecret: getKeyVaultSecret };
let promiseList = [];
try {
for (let dev of req.body) {
promiseList.add(handleMessage(commonArgs, dev.device, dev.measurements, dev.timestamp));
}
await Promise.all(promiseList); // This way is much better but you could also await each call too
} catch (e) {
///
我在您的数据中没有看到 timestamp
属性。也许你删除了它?
此外,为了获得更好的规模,您可能需要查看 Durable Functions too, specifically the Fan Out / Fan In 模式。
有了这个,您可以将 handleMessage
移动到一个 activity 函数中,该函数会随着您的负载而扩展。
我正在处理一个数据处理场景,其中在 Azure IoT 中心接收传入数据,使用 Azure 流分析处理并发布(http 触发器)到 Azure 函数:https://github.com/Azure/iotc-device-bridge 写在 Node.js.这里的目标是打破传入数据并异步处理对象。 传入的数据是 json 个对象的数组。
我试图遍历 req.body 以将其分解为不同的对象,但这没有用。
感谢大家的帮助!
函数的传入消息:
[{
"device": {
"deviceId": "my-cloud-device"
},
"measurements": {
"temp": 20.31,
"pressure": 50,
"humidity": 8.5,
"ledColor": "blue"
}
},
{
"device": {
"deviceId": "my-cloud-device"
},
"measurements": {
"temp": 20.31,
"pressure": 50,
"humidity": 8.5,
"ledColor": "blue"
}
}]
处理传入数据的代码
const request = require('request-promise-native');
const handleMessage = require('./lib/engine');
const bodyParser = require('body-parser');
const msiEndpoint = process.env.MSI_ENDPOINT;
const msiSecret = process.env.MSI_SECRET;
const parameters = {
idScope: process.env.ID_SCOPE,
primaryKeyUrl: process.env.IOTC_KEY_URL
};
let kvToken;
module.exports = async function (context, req) {
try {
await handleMessage({ ...parameters, log: context.log, getSecret: getKeyVaultSecret }, req.body.device, req.body.measurements, req.body.timestamp);
} catch (e) {
context.log('[ERROR]', e.message);
context.res = {
status: e.statusCode ? e.statusCode : 500,
body: e.message
};
}
console.log(b);
}
我期望做的是将传入消息分解为不同的对象,这些对象使用上面的代码块进行异步处理。
{
"device": {
"deviceId": "my-cloud-device"
},
"measurements": {
"temp": 20.31,
"pressure": 50,
"humidity": 8.5,
"ledColor": "blue"
}
}
{
"device": {
"deviceId": "my-cloud-device"
},
"measurements": {
"temp": 20.31,
"pressure": 50,
"humidity": 8.5,
"ledColor": "blue"
}
}
我假设 handleMessage
只针对一个对象。在这种情况下,您将不得不遍历 req.body
来访问每一个。你能分享一下你尝试过的和以前没有用过的吗?
你的代码应该是这样的
///
module.exports = async function (context, req) {
let commonArgs = { ...parameters, log: context.log, getSecret: getKeyVaultSecret };
let promiseList = [];
try {
for (let dev of req.body) {
promiseList.add(handleMessage(commonArgs, dev.device, dev.measurements, dev.timestamp));
}
await Promise.all(promiseList); // This way is much better but you could also await each call too
} catch (e) {
///
我在您的数据中没有看到 timestamp
属性。也许你删除了它?
此外,为了获得更好的规模,您可能需要查看 Durable Functions too, specifically the Fan Out / Fan In 模式。
有了这个,您可以将 handleMessage
移动到一个 activity 函数中,该函数会随着您的负载而扩展。