交叉应用从时间戳开始每 10 分钟记录一次的值数组,并在流分析中生成它们的时间戳
cross apply an array of values recorded every 10 mins from a timestamp and generate their timestamps in stream analytics
我有以下流分析输入:
{ "ID":"DEV-001-Test",
"TMSMUTC":"2021-10-14T14:00:00.000",
"MSGTYP":"TELEMETRY",
"THING":[
{
"TMSDUTC":"2021-10-14T13:00:00.000",
"DATA":[
{
"TAGID":"TAGB",
"VALUE":30
},
{
"TAGID":"TAGX",
"VALUE":[30.34,245.65,30.34,245.65,245.65,30.34]
}
]
}
]
}
其中“TAGX”的值数组表示从时间戳“TMSDUTC”:“2021-10-14T13:00:00.000”起一小时内每 10 分钟从传感器记录的一个值。
我想知道如何进行查询以提供类似的输出:
output
我的主要疑问是如何从时间戳创建 10 分钟的序列并将值交叉应用到它。
这个不错!请注意,我强烈建议您通过 npm 包使用 VSCode 和 ASA extension when working on these queries. The developer experience is much nicer than in the portal thanks to local testing, and you can also unit test 查询。
我做了以下假设:
- THING 是单个记录的数组。如果不是这样请告诉我
- [edited] TMSDUTC 需要在适用时根据每个项目在数组中的位置增加 10 分钟 (TAGX)
有了这个,这里是查询。它分为多个代码块来解释流程,但我也将其整个粘贴到最后一个代码块中。
首先,我们将所有必填字段带到第一级。它使事情更容易阅读,但不仅如此。 GetArrayElements 需要一个数组来交叉应用,但 GetArrayElement(单数)在编译时没有 return 类型。使用中间查询步骤可以解决这个问题。
WITH things AS (
SELECT
ID,
GetArrayElement(THING,0).TMSDUTC AS TMSDUTC,
MSGTYP AS MessageType,
GetArrayElement(THING,0).DATA AS DATA
FROM [input]
),
然后我们展开DATA
:
dataAll AS (
SELECT
T.ID,
T.TMSDUTC,
T.MessageType,
D.ArrayValue.TAGID AS Tag,
D.ArrayValue.Value AS [Value]
FROM things T
CROSS APPLY GetArrayElements(T.DATA) AS D
),
然后我们为具有 VALUE
类型数组(在您的示例中为 TAGX)的记录创建一个子集。在这里,我通过在运行时检测类型来避免对每个标签进行硬编码。这些记录将需要在接下来的步骤中进行另一轮数组处理。
dataArrays AS (
SELECT
A.ID,
A.TMSDUTC,
A.MessageType,
A.Tag,
A.[Value]
FROM dataAll A
WHERE GetType(A.[Value]) = 'array'
),
现在我们可以专注于为这些记录扩展 VALUE
。请注意,我们无法一次性完成此操作(在上方的数组上过滤并在下方交叉应用),因为 GetArrayElements 在过滤完成之前检查类型。
[edited] 为了递增 TMSDUTC
,我们在其数组中的每个项目的索引上使用 DATEADD(ArrayIndex/ArrayValue 都是 return 从数组扩展中编辑,请参阅下面的文档)。
dataArraysExpanded AS (
SELECT
A.ID,
DATEADD(minute,10*V.ArrayIndex,A.TMSDUTC) AS TMSDUTC,
A.MessageType,
A.Tag,
V.ArrayValue AS [Value]
FROM dataArrays A
CROSS APPLY GetArrayElements(A.[Value]) AS V
),
我们将所有东西联合起来:
newSchema AS (
SELECT ID, TMSDUTC, MessageType, Tag, [Value] FROM dataAll WHERE GetType([Value]) != 'array'
UNION
SELECT ID, TMSDUTC, MessageType, Tag, [Value] FROM dataArraysExpanded
)
最后将所有内容插入目标:
SELECT
*
INTO myOutput
FROM newSchema
[edited] 请注意,结果集上保证的唯一顺序是由时间戳定义的顺序。如果多个记录出现在同一个时间戳上,则默认情况下不保证顺序。在这里,在查询结束时,所有新创建的事件的时间戳仍然是原始事件的时间戳。如果现在需要在新生成的 TMSDUTC 上应用时间逻辑,则需要将这些记录输出到事件中心,并使用 TIMESTAMP BY TMSDUTC 将它们加载到另一个作业中。目前只能在查询的第一步直接更改时间戳。
这里用的是什么:
- GetArrayElement(单数):doc
- WITH aka Common Table 表达式 (CTE):doc
- 交叉应用 + GetArrayElements : doc and doc, plus very good ref
- 获取类型:doc
整个事情变得更容易 copy/pasting:
WITH things AS (
SELECT
ID,
GetArrayElement(THING,0).TMSDUTC AS TMSDUTC,
MSGTYP AS MessageType,
GetArrayElement(THING,0).DATA AS DATA
FROM [input]
),
dataAll AS (
SELECT
T.ID,
T.TMSDUTC,
T.MessageType,
D.ArrayValue.TAGID AS Tag,
D.ArrayValue.Value AS [Value]
FROM things T
CROSS APPLY GetArrayElements(T.DATA) AS D
),
dataArrays AS (
SELECT
A.ID,
A.TMSDUTC,
A.MessageType,
A.Tag,
A.[Value]
FROM dataAll A
WHERE GetType(A.[Value]) = 'array'
),
dataArraysExpanded AS (
SELECT
A.ID,
DATEADD(minute,10*V.ArrayIndex,A.TMSDUTC) AS TMSDUTC,
A.MessageType,
A.Tag,
V.ArrayValue AS [Value]
FROM dataArrays A
CROSS APPLY GetArrayElements(A.[Value]) AS V
),
newSchema AS (
SELECT ID, TMSDUTC, MessageType, Tag, [Value] FROM dataAll WHERE GetType([Value]) != 'array'
UNION
SELECT ID, TMSDUTC, MessageType, Tag, [Value] FROM dataArraysExpanded
)
SELECT
*
INTO myOutput
FROM newSchema
我有以下流分析输入:
{ "ID":"DEV-001-Test",
"TMSMUTC":"2021-10-14T14:00:00.000",
"MSGTYP":"TELEMETRY",
"THING":[
{
"TMSDUTC":"2021-10-14T13:00:00.000",
"DATA":[
{
"TAGID":"TAGB",
"VALUE":30
},
{
"TAGID":"TAGX",
"VALUE":[30.34,245.65,30.34,245.65,245.65,30.34]
}
]
}
]
}
其中“TAGX”的值数组表示从时间戳“TMSDUTC”:“2021-10-14T13:00:00.000”起一小时内每 10 分钟从传感器记录的一个值。 我想知道如何进行查询以提供类似的输出:
output
我的主要疑问是如何从时间戳创建 10 分钟的序列并将值交叉应用到它。
这个不错!请注意,我强烈建议您通过 npm 包使用 VSCode 和 ASA extension when working on these queries. The developer experience is much nicer than in the portal thanks to local testing, and you can also unit test 查询。
我做了以下假设:
- THING 是单个记录的数组。如果不是这样请告诉我
- [edited] TMSDUTC 需要在适用时根据每个项目在数组中的位置增加 10 分钟 (TAGX)
有了这个,这里是查询。它分为多个代码块来解释流程,但我也将其整个粘贴到最后一个代码块中。
首先,我们将所有必填字段带到第一级。它使事情更容易阅读,但不仅如此。 GetArrayElements 需要一个数组来交叉应用,但 GetArrayElement(单数)在编译时没有 return 类型。使用中间查询步骤可以解决这个问题。
WITH things AS (
SELECT
ID,
GetArrayElement(THING,0).TMSDUTC AS TMSDUTC,
MSGTYP AS MessageType,
GetArrayElement(THING,0).DATA AS DATA
FROM [input]
),
然后我们展开DATA
:
dataAll AS (
SELECT
T.ID,
T.TMSDUTC,
T.MessageType,
D.ArrayValue.TAGID AS Tag,
D.ArrayValue.Value AS [Value]
FROM things T
CROSS APPLY GetArrayElements(T.DATA) AS D
),
然后我们为具有 VALUE
类型数组(在您的示例中为 TAGX)的记录创建一个子集。在这里,我通过在运行时检测类型来避免对每个标签进行硬编码。这些记录将需要在接下来的步骤中进行另一轮数组处理。
dataArrays AS (
SELECT
A.ID,
A.TMSDUTC,
A.MessageType,
A.Tag,
A.[Value]
FROM dataAll A
WHERE GetType(A.[Value]) = 'array'
),
现在我们可以专注于为这些记录扩展 VALUE
。请注意,我们无法一次性完成此操作(在上方的数组上过滤并在下方交叉应用),因为 GetArrayElements 在过滤完成之前检查类型。
[edited] 为了递增 TMSDUTC
,我们在其数组中的每个项目的索引上使用 DATEADD(ArrayIndex/ArrayValue 都是 return 从数组扩展中编辑,请参阅下面的文档)。
dataArraysExpanded AS (
SELECT
A.ID,
DATEADD(minute,10*V.ArrayIndex,A.TMSDUTC) AS TMSDUTC,
A.MessageType,
A.Tag,
V.ArrayValue AS [Value]
FROM dataArrays A
CROSS APPLY GetArrayElements(A.[Value]) AS V
),
我们将所有东西联合起来:
newSchema AS (
SELECT ID, TMSDUTC, MessageType, Tag, [Value] FROM dataAll WHERE GetType([Value]) != 'array'
UNION
SELECT ID, TMSDUTC, MessageType, Tag, [Value] FROM dataArraysExpanded
)
最后将所有内容插入目标:
SELECT
*
INTO myOutput
FROM newSchema
[edited] 请注意,结果集上保证的唯一顺序是由时间戳定义的顺序。如果多个记录出现在同一个时间戳上,则默认情况下不保证顺序。在这里,在查询结束时,所有新创建的事件的时间戳仍然是原始事件的时间戳。如果现在需要在新生成的 TMSDUTC 上应用时间逻辑,则需要将这些记录输出到事件中心,并使用 TIMESTAMP BY TMSDUTC 将它们加载到另一个作业中。目前只能在查询的第一步直接更改时间戳。
这里用的是什么:
- GetArrayElement(单数):doc
- WITH aka Common Table 表达式 (CTE):doc
- 交叉应用 + GetArrayElements : doc and doc, plus very good ref
- 获取类型:doc
整个事情变得更容易 copy/pasting:
WITH things AS (
SELECT
ID,
GetArrayElement(THING,0).TMSDUTC AS TMSDUTC,
MSGTYP AS MessageType,
GetArrayElement(THING,0).DATA AS DATA
FROM [input]
),
dataAll AS (
SELECT
T.ID,
T.TMSDUTC,
T.MessageType,
D.ArrayValue.TAGID AS Tag,
D.ArrayValue.Value AS [Value]
FROM things T
CROSS APPLY GetArrayElements(T.DATA) AS D
),
dataArrays AS (
SELECT
A.ID,
A.TMSDUTC,
A.MessageType,
A.Tag,
A.[Value]
FROM dataAll A
WHERE GetType(A.[Value]) = 'array'
),
dataArraysExpanded AS (
SELECT
A.ID,
DATEADD(minute,10*V.ArrayIndex,A.TMSDUTC) AS TMSDUTC,
A.MessageType,
A.Tag,
V.ArrayValue AS [Value]
FROM dataArrays A
CROSS APPLY GetArrayElements(A.[Value]) AS V
),
newSchema AS (
SELECT ID, TMSDUTC, MessageType, Tag, [Value] FROM dataAll WHERE GetType([Value]) != 'array'
UNION
SELECT ID, TMSDUTC, MessageType, Tag, [Value] FROM dataArraysExpanded
)
SELECT
*
INTO myOutput
FROM newSchema