发布到 IOT 主题的 AWS Lambda 无限期触发
AWS Lambda publishing to IOT Topic fires indefinitely
问题:
我有一个 node.js (8.10) AWS Lambda 函数,它接受一个 json 对象并将其发布到 IOT 主题。该函数成功发布到主题,但是,一旦触发它就会不断调用,直到我将并发性限制为零以停止对该函数的任何进一步调用。
我想弄清楚我实施的错误导致调用该函数的不止一个实例。
函数:
这是我的函数:
var AWS = require('aws-sdk');
exports.handler = function (event, context) {
var iotdata = new AWS.IotData({endpoint: 'xxxxxxxxxx.iot.us-east-1.amazonaws.com'});
var params = {
topic: '/PiDevTest/SyncDevice',
payload: JSON.stringify(event),
qos: 0
};
iotdata.publish(params, function(err, data) {
if (err) {
console.log(err, err.stack);
} else {
console.log("Message sent.");
context.succeed();
}
});
};
我的测试json是:
{
"success": 1,
"TccvID": "TestID01"
}
测试控制台有 "null" 的响应,但 IOT 主题显示来自测试 json 的数据,大约每秒发布一次到主题。
我试过的
-我试图在它自己的非匿名函数中定义处理程序,称为处理程序,然后让 exports.handler = 处理程序;这没有产生任何错误,但也没有成功 post 到 iot 主题。
-我认为问题可能出在 node.js 回调上。我已经尝试实施它并将其排除在外(上面的当前迭代),但两种方式似乎都没有什么不同。我曾在某处读到该函数在出错时会重试,但我相信这种情况只会发生三次,因此无法解释函数的无限期调用。
-我还尝试从另一个 lambda 调用函数以确保问题不是 aws 测试工具。但是,这产生了相同的行为。
总结:
我做错了什么导致此函数无限期地将 json 数据发布到物联网主题?
提前感谢您抽出时间提供专业知识。
使用 aws-iot-device-sdk 创建 MQTT 客户端并使用它的 messageHandler 和 publish 方法将您的消息发布到 IOT 主题。示例 MQTT 客户端代码如下,
import * as DeviceSdk from 'aws-iot-device-sdk';
import * as AWS from 'aws-sdk';
let instance: any = null;
export default class IoTClient {
client: any;
/**
* Constructor
*
* @params {boolean} createNewClient - Whether or not to use existing client instance
*/
constructor(createNewClient = false, options = {}) {
}
async init(createNewClient, options) {
if (createNewClient && instance) {
instance.disconnect();
instance = null;
}
if (instance) {
return instance;
}
instance = this;
this.initClient(options);
this.attachDebugHandlers();
}
/**
* Instantiate AWS IoT device object
* Note that the credentials must be initialized with empty strings;
* When we successfully authenticate to the Cognito Identity Pool,
* the credentials will be dynamically updated.
*
* @params {Object} options - Options to pass to DeviceSdk
*/
initClient(options) {
const clientId = getUniqueId();
this.client = DeviceSdk.device({
region: options.region || getConfig('iotRegion'),
// AWS IoT Host endpoint
host: options.host || getConfig('iotHost'),
// clientId created earlier
clientId: options.clientId || clientId,
// Connect via secure WebSocket
protocol: options.protocol || getConfig('iotProtocol'),
// Set the maximum reconnect time to 500ms; this is a browser application
// so we don't want to leave the user waiting too long for reconnection after
// re-connecting to the network/re-opening their laptop/etc...
baseReconnectTimeMs: options.baseReconnectTimeMs || 500,
maximumReconnectTimeMs: options.maximumReconnectTimeMs || 1000,
// Enable console debugging information
debug: (typeof options.debug === 'undefined') ? true : options.debug,
// AWS access key ID, secret key and session token must be
// initialized with empty strings
accessKeyId: options.accessKeyId,
secretKey: options.secretKey,
sessionToken: options.sessionToken,
// Let redux handle subscriptions
autoResubscribe: (typeof options.debug === 'undefined') ? false : options.autoResubscribe,
});
}
disconnect() {
this.client.end();
}
attachDebugHandlers() {
this.client.on('reconnect', () => {
logger.info('reconnect');
});
this.client.on('offline', () => {
logger.info('offline');
});
this.client.on('error', (err) => {
logger.info('iot client error', err);
});
this.client.on('message', (topic, message) => {
logger.info('new message', topic, JSON.parse(message.toString()));
});
}
updateWebSocketCredentials(accessKeyId, secretAccessKey, sessionToken) {
this.client.updateWebSocketCredentials(accessKeyId, secretAccessKey, sessionToken);
}
attachMessageHandler(onNewMessageHandler) {
this.client.on('message', onNewMessageHandler);
}
attachConnectHandler(onConnectHandler) {
this.client.on('connect', (connack) => {
logger.info('connected', connack);
onConnectHandler(connack);
});
}
attachCloseHandler(onCloseHandler) {
this.client.on('close', (err) => {
logger.info('close', err);
onCloseHandler(err);
});
}
publish(topic, message) {
this.client.publish(topic, message);
}
subscribe(topic) {
this.client.subscribe(topic);
}
unsubscribe(topic) {
this.client.unsubscribe(topic);
logger.info('unsubscribed from topic', topic);
}
}
***getConfig()是从yml文件中获取环境变量,也可以直接在这里指定。
虽然他只是将其作为评论发布,但 MarkB 为我指明了正确的方向。
问题是解决方案与另一个正在收听相同主题并调用我正在处理的 lambda 的 lambda 有关。这导致了循环逻辑,因为从未满足退出条件。修复该代码解决了这个问题。
问题:
我有一个 node.js (8.10) AWS Lambda 函数,它接受一个 json 对象并将其发布到 IOT 主题。该函数成功发布到主题,但是,一旦触发它就会不断调用,直到我将并发性限制为零以停止对该函数的任何进一步调用。
我想弄清楚我实施的错误导致调用该函数的不止一个实例。
函数:
这是我的函数:
var AWS = require('aws-sdk');
exports.handler = function (event, context) {
var iotdata = new AWS.IotData({endpoint: 'xxxxxxxxxx.iot.us-east-1.amazonaws.com'});
var params = {
topic: '/PiDevTest/SyncDevice',
payload: JSON.stringify(event),
qos: 0
};
iotdata.publish(params, function(err, data) {
if (err) {
console.log(err, err.stack);
} else {
console.log("Message sent.");
context.succeed();
}
});
};
我的测试json是:
{
"success": 1,
"TccvID": "TestID01"
}
测试控制台有 "null" 的响应,但 IOT 主题显示来自测试 json 的数据,大约每秒发布一次到主题。
我试过的
-我试图在它自己的非匿名函数中定义处理程序,称为处理程序,然后让 exports.handler = 处理程序;这没有产生任何错误,但也没有成功 post 到 iot 主题。
-我认为问题可能出在 node.js 回调上。我已经尝试实施它并将其排除在外(上面的当前迭代),但两种方式似乎都没有什么不同。我曾在某处读到该函数在出错时会重试,但我相信这种情况只会发生三次,因此无法解释函数的无限期调用。
-我还尝试从另一个 lambda 调用函数以确保问题不是 aws 测试工具。但是,这产生了相同的行为。
总结:
我做错了什么导致此函数无限期地将 json 数据发布到物联网主题?
提前感谢您抽出时间提供专业知识。
使用 aws-iot-device-sdk 创建 MQTT 客户端并使用它的 messageHandler 和 publish 方法将您的消息发布到 IOT 主题。示例 MQTT 客户端代码如下,
import * as DeviceSdk from 'aws-iot-device-sdk';
import * as AWS from 'aws-sdk';
let instance: any = null;
export default class IoTClient {
client: any;
/**
* Constructor
*
* @params {boolean} createNewClient - Whether or not to use existing client instance
*/
constructor(createNewClient = false, options = {}) {
}
async init(createNewClient, options) {
if (createNewClient && instance) {
instance.disconnect();
instance = null;
}
if (instance) {
return instance;
}
instance = this;
this.initClient(options);
this.attachDebugHandlers();
}
/**
* Instantiate AWS IoT device object
* Note that the credentials must be initialized with empty strings;
* When we successfully authenticate to the Cognito Identity Pool,
* the credentials will be dynamically updated.
*
* @params {Object} options - Options to pass to DeviceSdk
*/
initClient(options) {
const clientId = getUniqueId();
this.client = DeviceSdk.device({
region: options.region || getConfig('iotRegion'),
// AWS IoT Host endpoint
host: options.host || getConfig('iotHost'),
// clientId created earlier
clientId: options.clientId || clientId,
// Connect via secure WebSocket
protocol: options.protocol || getConfig('iotProtocol'),
// Set the maximum reconnect time to 500ms; this is a browser application
// so we don't want to leave the user waiting too long for reconnection after
// re-connecting to the network/re-opening their laptop/etc...
baseReconnectTimeMs: options.baseReconnectTimeMs || 500,
maximumReconnectTimeMs: options.maximumReconnectTimeMs || 1000,
// Enable console debugging information
debug: (typeof options.debug === 'undefined') ? true : options.debug,
// AWS access key ID, secret key and session token must be
// initialized with empty strings
accessKeyId: options.accessKeyId,
secretKey: options.secretKey,
sessionToken: options.sessionToken,
// Let redux handle subscriptions
autoResubscribe: (typeof options.debug === 'undefined') ? false : options.autoResubscribe,
});
}
disconnect() {
this.client.end();
}
attachDebugHandlers() {
this.client.on('reconnect', () => {
logger.info('reconnect');
});
this.client.on('offline', () => {
logger.info('offline');
});
this.client.on('error', (err) => {
logger.info('iot client error', err);
});
this.client.on('message', (topic, message) => {
logger.info('new message', topic, JSON.parse(message.toString()));
});
}
updateWebSocketCredentials(accessKeyId, secretAccessKey, sessionToken) {
this.client.updateWebSocketCredentials(accessKeyId, secretAccessKey, sessionToken);
}
attachMessageHandler(onNewMessageHandler) {
this.client.on('message', onNewMessageHandler);
}
attachConnectHandler(onConnectHandler) {
this.client.on('connect', (connack) => {
logger.info('connected', connack);
onConnectHandler(connack);
});
}
attachCloseHandler(onCloseHandler) {
this.client.on('close', (err) => {
logger.info('close', err);
onCloseHandler(err);
});
}
publish(topic, message) {
this.client.publish(topic, message);
}
subscribe(topic) {
this.client.subscribe(topic);
}
unsubscribe(topic) {
this.client.unsubscribe(topic);
logger.info('unsubscribed from topic', topic);
}
}
***getConfig()是从yml文件中获取环境变量,也可以直接在这里指定。
虽然他只是将其作为评论发布,但 MarkB 为我指明了正确的方向。
问题是解决方案与另一个正在收听相同主题并调用我正在处理的 lambda 的 lambda 有关。这导致了循环逻辑,因为从未满足退出条件。修复该代码解决了这个问题。