具有多个输入的 Azure 流分析查询

Azure Stream Analytics query with multiple inputs

问题

我有一个 Azure IoT 中心向 Azure 流分析作业发送消息。

每条消息包含一个'NodeName'。 我有一个 table 'plcnext_nodes' ,每个节点都有一个唯一的 'NodeId' 及其对应的 'NodeName'.

如何使用来自 'plcnext_nodes' table 的输入和 IoT-Hub 消息将事件数据存储在另一个 SQL table 中使用 'NodeId'?

我想使用 'NodeId' 而不是 'NodeName',因为有些名称可能会变得很长,并且在每条消息中一遍又一遍地保存它们是一种存储空间的浪费。


所需的解决方案

我想解析来自 IoT 中心的以下消息:

{
    "NodeName": "ns=5;s=Arp.Plc.Eclr/DI2",
    "NodeDataType": "Boolean",
    "EventValue": 0,
    "EventMeasuredUtcTime": "2019-11-11T12:15:22.4830000Z",
    "EventProcessedUtcTime": "2019-11-11T12:41:57.1706596Z",
    "EventEnqueuedUtcTime": "2019-11-11T12:15:32.1260000Z",
    "IoTHub": {
        ...
    }
}

比较 'NodeName' 和 plcnext_nodes table 得到合适的 'NodeId':

NodeId  NodeName                 NodeDataType
---------------------------------------------
1       ns=5;s=Arp.Plc.Eclr/DI1  Boolean
2       ns=5;s=Arp.Plc.Eclr/DI2  Boolean
...

得到以下输出并插入plcnext_events table:

NodeId  EventValue  EventMeasured
-----------------------------------------------
1       0           2019-11-11 12:15:22.4830000

查询

我在 Azure 流分析上尝试了以下查询:

SELECT
    NodeId,
    EventValue,
    EventMeasuredUtcTime,
    EventEnqueuedUtcTime,
    EventProcessedUtcTime
INTO
    [plcnext_events]
FROM
    [plcnext_nodes],
    [iot_hub]
WHERE
    [iot_hub].NodeName = [plcnext-nodes].NodeName

但是 FROM 不支持 JOIN,而且我无法使用 JOIN 子句,因为它有 DATEDIFF 限制(plcnext_nodes table 没有时间戳)

有办法实现吗?

您可以在流分析中使用引用数据 JOIN 来连接表。

Using Reference Data for Lookups in Stream Analytics

Reference data (also known as a lookup table) is a finite data set that is static or slowly changing in nature, used to perform a lookup or to augment your data streams.

对于您的情况,请转到流分析作业中的 Inputs 部分并添加参考输入。您可以 select Blob 存储SQL 数据库 作为您的来源。

然后您可以将 SQL 查询定义为 return 您需要的参考数据。对于您的情况,您的参考数据查询将如下所示:

SELECT NodeId, NodeName, NodeDataType
FROM dbo.plcnext-nodes

在 SA 中定义引用数据连接后,转到流分析中的 查询 部分并更新您的查询。对于您的场景,您的查询将如下所示(使用 JOIN):

SELECT
   pn.NodeId,
   hub.EventValue,
   hub.EventMeasuredUtcTime,
   hub.EventEnqueuedUtcTime,
   hub.EventProcessedUtcTime
   INTO [plcnext_events]
FROM [iot_hub] hub
JOIN [plcnext-nodes] pn ON pn.NodeName = hub.NodeName