获取在流分析中接收到一系列消息的时间范围

Getting the time frame in which a series of message is received in stream analytics

我正在流式传输包含 posix/epoch 时间字段的事件消息。我正在尝试计算我从设备收到一系列消息的时间范围。

让我们假设以下(简化)输入:

[
   { "deviceid":"device01", "epochtime":1500975613660 },
   { "deviceid":"device01", "epochtime":1500975640194 },
   { "deviceid":"device01", "epochtime":1500975649627 },
   { "deviceid":"device01", "epochtime":1500994473225 },
   { "deviceid":"device01", "epochtime":1500994486725 }
]

我的计算结果应该是每个设备 ID 的消息,例如 {deviceid, start, end}。如果两个事件之间的时间间隔超过一小时,我假设一个新的时间范围开始。在我的示例中,这将导致两次传输:

[
{"deviceid":"device01", "start":1500975613660, "end"=1500975649627},
{"deviceid":"device01", "start":500994473225, "end"=1500994486725}
]

我可以根据文档https://msdn.microsoft.com/en-us/library/azure/mt573293.aspx中的示例2转换纪元时间。但是,我不能在子查询中将转换后的时间戳与 LAG 函数一起使用。 previousTime 的所有值在输出中都为空。

WITH step1 AS (
SELECT
   [deviceid] AS deviceId,
   System.Timestamp AS ts,
   LAG([ts]) OVER (LIMIT DURATION(hour, 24)) as previousTime
FROM 
   input TIMESTAMP BY DATEADD(millisecond, epochtime, '1970-01-01T00:00:00Z') 
)

我不确定如何进行计算以及最好的计算方法是什么。我需要弄清楚事件系列的开始和结束。

非常感谢任何帮助。

我在下面稍微修改了您的查询以获得预期结果:

WITH STEP1 AS (
  SELECT   
    [deviceid] AS deviceId,
    System.Timestamp AS ts,
    LAG(DATEADD(millisecond, epochtime, '1970-01-01T00:00:00Z') ) OVER (LIMIT DURATION(hour, 24)) as previousTime
  FROM 
     input TIMESTAMP BY DATEADD(millisecond, epochtime, '1970-01-01T00:00:00Z') 
)
SELECT * from STEP1

问题是 "ts" 是在当前步骤中定义的,但是当使用 LAG 时,您正在查看来自 FROM 语句的原始消息,它不包含 "ts" 变量.

如果您有任何问题,请告诉我。

谢谢,

JS - Azure 流分析团队