AWS Lambda - 如何获取来自 AWS IOT 的数据的主题名称
AWS Lambda - How to get the topic name of data coming from AWS IOT
我正在使用 AWS IOT 源测试 AWS Lambda。我的 mqtt 客户端在不同的主题中发布:设备 A 将数据发布到 streaming/A
,设备 B 将数据发布到 streaming/B
因此在 AWS Lambda 中我定义了一个 SQL 规则选择来自主题的所有设备streaming/+
。问题是现在我没有设备源的信息,因为我只有 Array[Byte]]
和额外的信息。如果有人有解决方案来访问带有主题信息的 mqtt 有效负载,我会采纳!
import java.io.{ByteArrayOutputStream, InputStream, OutputStream}
import com.amazonaws.services.lambda.runtime.{Context, RequestStreamHandler}
/**
* Created by alifirat on 24/04/17.
*/
class IOTConsumer extends RequestStreamHandler {
val BUFFER_SIZE = 1024 * 4
override def handleRequest(input: InputStream, output: OutputStream, context: Context): Unit = {
val bytes = toByteArray(input)
val logger= context.getLogger
logger.log("Receive following thing :" + new String(bytes))
output.write(bytes)
}
/**
* Reads and returns the rest of the given input stream as a byte array.
* Caller is responsible for closing the given input stream.
*/
def toByteArray(is : InputStream) : Array[Byte] = {
val output = new ByteArrayOutputStream()
try {
val b = new Array[Byte](BUFFER_SIZE);
var n = 0
var flag = true
while(flag) {
n = is.read(b)
if(n == -1) flag = false
else {
output.write(b, 0, n)
}
}
output.toByteArray();
} finally {
output.close();
Array[Byte]()
}
}
}
如果您的触发器是 SNS 消息,那么我会直接阅读 JSON。这将在 Scala 中工作:
import com.amazonaws.services.lambda.runtime.events.SNSEvent
import scala.collection.JavaConverters._
object Example extends LambdaApp {
/** Convert Java lists (or nulls!) to Scala lists */
def safeList[A](xs: java.util.List[A]) =
Option(xs).map(_.asScala).getOrElse(List.empty[A])
/** Install the handler in AWS Lambda as `Example::handler`. */
def handler(e: SNSEvent) = {
val rs = for {
r <- safeList(e.getRecords)
} yield {
r.getSNS.getMessage
}
rs.asJava // Convert Scala list to Java.
}
}
您需要在 build.sbt 中具有以下依赖项:
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-lambda-java-core" % "1.1.0",
"com.amazonaws" % "aws-lambda-java-events" % "1.3.0"
)
如果您对 SNS 主题名称感兴趣,可以从以下位置获取:
r.EventSubscriptionArn
AWS Lambda JDK 库使用 jackson-core 为您解析 SNS 消息的JSON。
我也在寻找同样的东西,有一种方法可以实现。在构建 SQL 时,您可以使用 topic() 函数来获取消息发送到的主题。这样你就可以放入属性部分
*, topic() as topic
因此您的最终 SQL 将如下所示:
SELECT *, topic() as topic FROM one/of/my/+/topics
然后您的有效负载将包含一个新的属性主题,您可以在 lambda 函数中对其进行解析。关于此的更多信息 https://docs.aws.amazon.com/iot/latest/developerguide/iot-sql-functions.html
我正在使用 AWS IOT 源测试 AWS Lambda。我的 mqtt 客户端在不同的主题中发布:设备 A 将数据发布到 streaming/A
,设备 B 将数据发布到 streaming/B
因此在 AWS Lambda 中我定义了一个 SQL 规则选择来自主题的所有设备streaming/+
。问题是现在我没有设备源的信息,因为我只有 Array[Byte]]
和额外的信息。如果有人有解决方案来访问带有主题信息的 mqtt 有效负载,我会采纳!
import java.io.{ByteArrayOutputStream, InputStream, OutputStream}
import com.amazonaws.services.lambda.runtime.{Context, RequestStreamHandler}
/**
* Created by alifirat on 24/04/17.
*/
class IOTConsumer extends RequestStreamHandler {
val BUFFER_SIZE = 1024 * 4
override def handleRequest(input: InputStream, output: OutputStream, context: Context): Unit = {
val bytes = toByteArray(input)
val logger= context.getLogger
logger.log("Receive following thing :" + new String(bytes))
output.write(bytes)
}
/**
* Reads and returns the rest of the given input stream as a byte array.
* Caller is responsible for closing the given input stream.
*/
def toByteArray(is : InputStream) : Array[Byte] = {
val output = new ByteArrayOutputStream()
try {
val b = new Array[Byte](BUFFER_SIZE);
var n = 0
var flag = true
while(flag) {
n = is.read(b)
if(n == -1) flag = false
else {
output.write(b, 0, n)
}
}
output.toByteArray();
} finally {
output.close();
Array[Byte]()
}
}
}
如果您的触发器是 SNS 消息,那么我会直接阅读 JSON。这将在 Scala 中工作:
import com.amazonaws.services.lambda.runtime.events.SNSEvent
import scala.collection.JavaConverters._
object Example extends LambdaApp {
/** Convert Java lists (or nulls!) to Scala lists */
def safeList[A](xs: java.util.List[A]) =
Option(xs).map(_.asScala).getOrElse(List.empty[A])
/** Install the handler in AWS Lambda as `Example::handler`. */
def handler(e: SNSEvent) = {
val rs = for {
r <- safeList(e.getRecords)
} yield {
r.getSNS.getMessage
}
rs.asJava // Convert Scala list to Java.
}
}
您需要在 build.sbt 中具有以下依赖项:
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-lambda-java-core" % "1.1.0",
"com.amazonaws" % "aws-lambda-java-events" % "1.3.0"
)
如果您对 SNS 主题名称感兴趣,可以从以下位置获取:
r.EventSubscriptionArn
AWS Lambda JDK 库使用 jackson-core 为您解析 SNS 消息的JSON。
我也在寻找同样的东西,有一种方法可以实现。在构建 SQL 时,您可以使用 topic() 函数来获取消息发送到的主题。这样你就可以放入属性部分
*, topic() as topic
因此您的最终 SQL 将如下所示:
SELECT *, topic() as topic FROM one/of/my/+/topics
然后您的有效负载将包含一个新的属性主题,您可以在 lambda 函数中对其进行解析。关于此的更多信息 https://docs.aws.amazon.com/iot/latest/developerguide/iot-sql-functions.html