对某些值取平均值,由其他消息的条目确定

Average over some values, determined by entries from other messages

我的流分析正在摄取不同类型的消息流。有些属于 type: telemetry,有些属于 type: flags。我需要计算一些遥测值的移动 window 的平均值,但只考虑那些前面带有值为 true 的标志消息的值。

换句话说:标志消息变为 on/off 遥测是否被考虑用于计算平均值。

我试过以下查询:

SELECT
    devId,
    type,
    AVG ("myValue") OVER ( LIMIT DURATION (second, 30) WHEN
        LAG (value) OVER (LIMIT DURATION(minute, 5) WHEN type='flag' and text='myValueFlag') = 1
    ) as 'myValueAvg'
    MAX("ts") as 'ts'
INTO "eventhub-output"
FROM "iothub-input" TIMESTAMP BY "ts"
WHERE type = 'telemetry'
GROUP BY devId, type, SlidingWindow(second, 30)

但我收到此错误消息:

In a query with GROUP BY, Analytic functions can not be used neither in the GROUP BY nor in SELECT, unless they are argument to an aggregate function such as in SUM(LAG(x) OVER(LIMIT DURATION(ss, 5)).

我不确定如何从这里开始。 documentation 表示

LAG isn't affected by predicates in the WHERE clause, join conditions in the JOIN clause, or grouping expressions in the GROUP BY clause of the current query because it's evaluated before those clauses.

所以我假设上面的查询应该有效。

我终于让它工作了。诀窍是根本不使用 AVG OVER 子句。相反,我在过滤相关数据点的子查询上使用简单的 AVG

WITH relevant_data AS (
    SELECT 
        devId, 
        myValue, 
        ts
    FROM "iothub-input" TIMESTAMP by "ts"
    WHERE myValue IS NOT NULL
    AND type = 'telemetry'
    AND LAG (value,1,1) OVER ( LIMIT DURATION(minute, 5) WHEN type='flag' and text='myValueFlag') = 1
)

SELECT 
    devId,
    AVG ( myValue ) as 'myValueAvg',
    MAX("ts") as 'ts'
FROM relevant_data
GROUP BY devId, SlidingWindow(second, 30)