交叉应用从时间戳开始每 10 分钟记录一次的值数组,并在流分析中生成它们的时间戳

cross apply an array of values recorded every 10 mins from a timestamp and generate their timestamps in stream analytics

我有以下流分析输入:

{ "ID":"DEV-001-Test",
  "TMSMUTC":"2021-10-14T14:00:00.000",
  "MSGTYP":"TELEMETRY",
  "THING":[
           {
            "TMSDUTC":"2021-10-14T13:00:00.000",
            "DATA":[
                {
                  "TAGID":"TAGB",
                  "VALUE":30
                },
                {
                  "TAGID":"TAGX",
                  "VALUE":[30.34,245.65,30.34,245.65,245.65,30.34]
                }
               ]
           }
          ]
}

其中“TAGX”的值数组表示从时间戳“TMSDUTC”:“2021-10-14T13:00:00.000”起一小时内每 10 分钟从传感器记录的一个值。 我想知道如何进行查询以提供类似的输出:

output

我的主要疑问是如何从时间戳创建 10 分钟的序列并将值交叉应用到它。

这个不错!请注意,我强烈建议您通过 npm 包使用 VSCode 和 ASA extension when working on these queries. The developer experience is much nicer than in the portal thanks to local testing, and you can also unit test 查询。

我做了以下假设:

  • THING 是单个记录的数组。如果不是这样请告诉我
  • [edited] TMSDUTC 需要在适用时根据每个项目在数组中的位置增加 10 分钟 (TAGX)

有了这个,这里是查询。它分为多个代码块来解释流程,但我也将其整个粘贴到最后一个代码块中。

首先,我们将所有必填字段带到第一级。它使事情更容易阅读,但不仅如此。 GetArrayElements 需要一个数组来交叉应用,但 GetArrayElement(单数)在编译时没有 return 类型。使用中间查询步骤可以解决这个问题。

WITH things AS (
    SELECT
        ID,
        GetArrayElement(THING,0).TMSDUTC AS TMSDUTC,
        MSGTYP AS MessageType,
        GetArrayElement(THING,0).DATA AS DATA
    FROM [input]
),

然后我们展开DATA:

dataAll AS (
    SELECT
        T.ID,
        T.TMSDUTC,
        T.MessageType,
        D.ArrayValue.TAGID AS Tag,
        D.ArrayValue.Value AS [Value]
    FROM things T
    CROSS APPLY GetArrayElements(T.DATA) AS D
),

然后我们为具有 VALUE 类型数组(在您的示例中为 TAGX)的记录创建一个子集。在这里,我通过在运行时检测类型来避免对每个标签进行硬编码。这些记录将需要在接下来的步骤中进行另一轮数组处理。

dataArrays AS (
    SELECT
        A.ID,
        A.TMSDUTC,
        A.MessageType,
        A.Tag,
        A.[Value]
    FROM dataAll A
    WHERE GetType(A.[Value]) = 'array'
),

现在我们可以专注于为这些记录扩展 VALUE。请注意,我们无法一次性完成此操作(在上方的数组上过滤并在下方交叉应用),因为 GetArrayElements 在过滤完成之前检查类型。

[edited] 为了递增 TMSDUTC,我们在其数组中的每个项目的索引上使用 DATEADD(ArrayIndex/ArrayValue 都是 return 从数组扩展中编辑,请参阅下面的文档)。

dataArraysExpanded AS (
    SELECT
        A.ID,
        DATEADD(minute,10*V.ArrayIndex,A.TMSDUTC) AS TMSDUTC,
        A.MessageType,
        A.Tag,
        V.ArrayValue AS [Value]
    FROM dataArrays A
    CROSS APPLY GetArrayElements(A.[Value]) AS V
),

我们将所有东西联合起来:

newSchema AS (
    SELECT ID, TMSDUTC, MessageType, Tag, [Value] FROM dataAll WHERE GetType([Value]) != 'array'
        UNION
    SELECT ID, TMSDUTC, MessageType, Tag, [Value] FROM dataArraysExpanded
)

最后将所有内容插入目标:

SELECT
    *
INTO myOutput
FROM newSchema

[edited] 请注意,结果集上保证的唯一顺序是由时间戳定义的顺序。如果多个记录出现在同一个时间戳上,则默认情况下不保证顺序。在这里,在查询结束时,所有新创建的事件的时间戳仍然是原始事件的时间戳。如果现在需要在新生成的 TMSDUTC 上应用时间逻辑,则需要将这些记录输出到事件中心,并使用 TIMESTAMP BY TMSDUTC 将它们加载到另一个作业中。目前只能在查询的第一步直接更改时间戳。

这里用的是什么:

  • GetArrayElement(单数):doc
  • WITH aka Common Table 表达式 (CTE):doc
  • 交叉应用 + GetArrayElements : doc and doc, plus very good ref
  • 获取类型:doc

整个事情变得更容易 copy/pasting:

WITH things AS (
    SELECT
        ID,
        GetArrayElement(THING,0).TMSDUTC AS TMSDUTC,
        MSGTYP AS MessageType,
        GetArrayElement(THING,0).DATA AS DATA
    FROM [input]
),
dataAll AS (
    SELECT
        T.ID,
        T.TMSDUTC,
        T.MessageType,
        D.ArrayValue.TAGID AS Tag,
        D.ArrayValue.Value AS [Value]
    FROM things T
    CROSS APPLY GetArrayElements(T.DATA) AS D
),
dataArrays AS (
    SELECT
        A.ID,
        A.TMSDUTC,
        A.MessageType,
        A.Tag,
        A.[Value]
    FROM dataAll A
    WHERE GetType(A.[Value]) = 'array'
),
dataArraysExpanded AS (
    SELECT
        A.ID,
        DATEADD(minute,10*V.ArrayIndex,A.TMSDUTC) AS TMSDUTC,
        A.MessageType,
        A.Tag,
        V.ArrayValue AS [Value]
    FROM dataArrays A
    CROSS APPLY GetArrayElements(A.[Value]) AS V
),
newSchema AS (
    SELECT ID, TMSDUTC, MessageType, Tag, [Value] FROM dataAll WHERE GetType([Value]) != 'array'
        UNION
    SELECT ID, TMSDUTC, MessageType, Tag, [Value] FROM dataArraysExpanded
)
SELECT
    *
INTO myOutput
FROM newSchema