在 IoT Analytics 中解析 JSON 数组输入
Parse JSON array input in IoT Analytics
我一次从我频道中的 IoT 设备接收多个数据记录作为 JSON 数组。收到的消息如下所示:
[
{
"Field1": "Value1",
"Field2": "Value2",
"Field3": "Value3"
},
{
"Field1": "AnotherValue1",
"Field2": "AnotherValue2",
"Field3": "AnotherValue3"
}
]
我使用以下 SQL 查询创建数据集:
SELECT * FROM mydatastore
当我运行数据集时,返回的结果是:
array __dt
----- -----
[{field1=Value1, field2=Value2, field3=Value3}] 2019-02-21 00:00:00.000
我想要的结果是:
Field1 Field2 Field3
------ ------ ------
Value1 Value2 Value3
AnotherValue1 AnotherValue2 AnotherValue3
如何让 IoT Analytics 在数据存储中为接收到的 JSON 数组中的每个元素创建一个新行?
How can I get IoT Analytics to create a new row in the datastore for each element within the received JSON array?
最简单的方法应该是在您的管道上利用 Lambda Activity,并让它将单个 JSON 有效负载解析为所需的结构。这在某种程度上取决于发送到通道的消息的 'raw' 结构。
因此,例如,我们可以通过 CLI batch-put-message 向 Channel 发送数据,如下所示:
aws iotanalytics batch-put-message --channel-name sample_channel --messages '[{"messageId": "message1", "payload": "{\"array\": [{\"Field1\": \"Value1\", \"Field2\": \"Value2\", \"Field3\": \"Value3\"},{\"Field1\": \"AnotherValue1\", \"Field2\": \"AnotherValue2\", \"Field3\": \"AnotherValue3\"}]}"}]'
然后频道将有一条结构如下的消息:
{
"messageId": "message1",
"payload": {
"array": [
{
"Field1": "Value1",
"Field2": "Value2",
"Field3": "Value3"
},
{
"Field1": "AnotherValue1",
"Field2": "AnotherValue2",
"Field3": "AnotherValue3"
}
]
}
}
如果您的管道有 Lambda Activity,那么来自通道的消息将在 event
参数中传递给您的 Lambda 函数。
我使用 AWS Lambda 控制台内联编辑器创建了一个简单的 Lambda 函数(使用 Python 3.7),并将其命名为 sample_lambda
:
import json
import sys
import logging
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
streamHandler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)
def lambda_handler(event, context):
# This can be handy to see the raw structure of the incoming event
# will log to the matching CloudWatch log:
# /aws/lambda/<name_of_the_lambda>
# logger.info("raw event: {}".format(event))
parsed_rows = []
# Depending on the batchSize setting of the Lambda Pipeline Activity,
# you may receive multiple messages in a single event
for message_payload in event:
if 'array' in message_payload:
for row in message_payload['array']:
parsed = {}
for key, value in row.items():
parsed[key] = value
parsed_rows.append(parsed)
return parsed_rows
我添加了适当的权限,以便 IoT-Analytics 可以通过 CLI 调用 lambda 函数:
aws lambda add-permission --function-name sample_lambda --statement-id statm01 --principal iotanalytics.amazonaws.com --action lambda:InvokeFunction
重新处理Pipeline,解析后的行放在DataStore中;执行数据集,我得到了这个最终结果:
"array","field1","field2","field3","__dt"
,"Value1","Value2","Value3","2019-04-26 00:00:00.000"
,"AnotherValue1","AnotherValue2","AnotherValue3","2019-04-26 00:00:00.000"
迟到回复:
想想看,IoT 规则 - select 语句有很多内置的 clause/functions 像 'as' 能够转换输入负载。请检查,可能正在锻炼。
示例 - https://docs.aws.amazon.com/iot/latest/developerguide/iot-sql-select.html
在主题 'topic/subtopic' 上发布的传入负载:{"color":{"red":255,"green":0,"blue":0}, "temperature":50}
SQL:
SELECT color.red as red_value FROM 'topic/subtopic' Outgoing payload: {"red_value":255}
我一次从我频道中的 IoT 设备接收多个数据记录作为 JSON 数组。收到的消息如下所示:
[
{
"Field1": "Value1",
"Field2": "Value2",
"Field3": "Value3"
},
{
"Field1": "AnotherValue1",
"Field2": "AnotherValue2",
"Field3": "AnotherValue3"
}
]
我使用以下 SQL 查询创建数据集:
SELECT * FROM mydatastore
当我运行数据集时,返回的结果是:
array __dt
----- -----
[{field1=Value1, field2=Value2, field3=Value3}] 2019-02-21 00:00:00.000
我想要的结果是:
Field1 Field2 Field3
------ ------ ------
Value1 Value2 Value3
AnotherValue1 AnotherValue2 AnotherValue3
如何让 IoT Analytics 在数据存储中为接收到的 JSON 数组中的每个元素创建一个新行?
How can I get IoT Analytics to create a new row in the datastore for each element within the received JSON array?
最简单的方法应该是在您的管道上利用 Lambda Activity,并让它将单个 JSON 有效负载解析为所需的结构。这在某种程度上取决于发送到通道的消息的 'raw' 结构。
因此,例如,我们可以通过 CLI batch-put-message 向 Channel 发送数据,如下所示:
aws iotanalytics batch-put-message --channel-name sample_channel --messages '[{"messageId": "message1", "payload": "{\"array\": [{\"Field1\": \"Value1\", \"Field2\": \"Value2\", \"Field3\": \"Value3\"},{\"Field1\": \"AnotherValue1\", \"Field2\": \"AnotherValue2\", \"Field3\": \"AnotherValue3\"}]}"}]'
然后频道将有一条结构如下的消息:
{
"messageId": "message1",
"payload": {
"array": [
{
"Field1": "Value1",
"Field2": "Value2",
"Field3": "Value3"
},
{
"Field1": "AnotherValue1",
"Field2": "AnotherValue2",
"Field3": "AnotherValue3"
}
]
}
}
如果您的管道有 Lambda Activity,那么来自通道的消息将在 event
参数中传递给您的 Lambda 函数。
我使用 AWS Lambda 控制台内联编辑器创建了一个简单的 Lambda 函数(使用 Python 3.7),并将其命名为 sample_lambda
:
import json
import sys
import logging
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
streamHandler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)
def lambda_handler(event, context):
# This can be handy to see the raw structure of the incoming event
# will log to the matching CloudWatch log:
# /aws/lambda/<name_of_the_lambda>
# logger.info("raw event: {}".format(event))
parsed_rows = []
# Depending on the batchSize setting of the Lambda Pipeline Activity,
# you may receive multiple messages in a single event
for message_payload in event:
if 'array' in message_payload:
for row in message_payload['array']:
parsed = {}
for key, value in row.items():
parsed[key] = value
parsed_rows.append(parsed)
return parsed_rows
我添加了适当的权限,以便 IoT-Analytics 可以通过 CLI 调用 lambda 函数:
aws lambda add-permission --function-name sample_lambda --statement-id statm01 --principal iotanalytics.amazonaws.com --action lambda:InvokeFunction
重新处理Pipeline,解析后的行放在DataStore中;执行数据集,我得到了这个最终结果:
"array","field1","field2","field3","__dt"
,"Value1","Value2","Value3","2019-04-26 00:00:00.000"
,"AnotherValue1","AnotherValue2","AnotherValue3","2019-04-26 00:00:00.000"
迟到回复:
想想看,IoT 规则 - select 语句有很多内置的 clause/functions 像 'as' 能够转换输入负载。请检查,可能正在锻炼。
示例 - https://docs.aws.amazon.com/iot/latest/developerguide/iot-sql-select.html
在主题 'topic/subtopic' 上发布的传入负载:{"color":{"red":255,"green":0,"blue":0}, "temperature":50}
SQL:
SELECT color.red as red_value FROM 'topic/subtopic' Outgoing payload: {"red_value":255}