每 15 秒将单个消息组合为 1 个输出
Combine individual messages to 1 output every 15 seconds
是否可以将每 15 秒收到的单独设备消息合并为 1 条汇总设备消息?
例如我想收集 "Temperature"、"RPM" 和 "Gauge" 消息:
Message 1
{
DeviceId: 1
Key: "Temperature"
Value: 15
}
Message 2
{
DeviceId: 1,
Key: "RPM"
Value: 2000
}
进入:
{
DeviceId: 1,
Readings:
{
"Temperature": 15,
"RPM": 2000
"Gauge": "Unknown" (no value in the 15 seconds window, so unknown)
}
}
编辑:
忘记补充了,如果一个设备("Temperature"、"RPM" 和"Gauge")的 3 条消息中有 1 条到达 SA。其他 2 个可能会在接下来的 15 秒内收到。因此,当 SA 作业看到其中一条消息时,它应该开始 "collecting"。 SA 工作适合这个吗?还是我应该寻找其他解决方案?
您可以使用 LAG 函数执行此操作。看起来像这样:
SELECT
DeviceId, [Key], [Value]
LAG(reading) OVER (PARTITION BY DeviceId LIMIT DURATION(hour, 1) WHEN reading IS NOT NULL)
FROM input
LAG 允许您查看指定数量的事件,然后查看给定的分区键(在本例中我使用了 DeviceId)。
感谢您的回复。我已经使用 SessionWindow 解决了这个问题。
是否可以将每 15 秒收到的单独设备消息合并为 1 条汇总设备消息?
例如我想收集 "Temperature"、"RPM" 和 "Gauge" 消息:
Message 1
{
DeviceId: 1
Key: "Temperature"
Value: 15
}
Message 2
{
DeviceId: 1,
Key: "RPM"
Value: 2000
}
进入:
{
DeviceId: 1,
Readings:
{
"Temperature": 15,
"RPM": 2000
"Gauge": "Unknown" (no value in the 15 seconds window, so unknown)
}
}
编辑: 忘记补充了,如果一个设备("Temperature"、"RPM" 和"Gauge")的 3 条消息中有 1 条到达 SA。其他 2 个可能会在接下来的 15 秒内收到。因此,当 SA 作业看到其中一条消息时,它应该开始 "collecting"。 SA 工作适合这个吗?还是我应该寻找其他解决方案?
您可以使用 LAG 函数执行此操作。看起来像这样:
SELECT DeviceId, [Key], [Value] LAG(reading) OVER (PARTITION BY DeviceId LIMIT DURATION(hour, 1) WHEN reading IS NOT NULL) FROM input
LAG 允许您查看指定数量的事件,然后查看给定的分区键(在本例中我使用了 DeviceId)。
感谢您的回复。我已经使用 SessionWindow 解决了这个问题。