wso2 cep Siddhiql

wso2 cep Siddhiql

我有多个传感器将测量事件发送到流中。一个事件由{参数、值、时间戳}组成。我想观察这些值 window 几天,检查趋势并对这些不同传感器监测的设备进行诊断。

  1. 按参数划分流。
    来自 inputStream[参数='A']
    select *
    插入 Astream;

接收到的每个参数依此类推。

  1. 时间window,比如 60 秒,计算线性回归以找到变化。
    来自 Astream#timeseries:lengthTimeRegress(60000, value, timestamp)
    select beta1 * 100 作为 AChange
    插入 AChangeStream;

这是我为每个指标流做的。 3. 一旦我掌握了每个流的趋势,我就会收集每个流的变化值并检查它们是否满足条件。

    从每个 e1=AChangeStream[e1.AChangeStream > 0.5], e2=BChangeStream[e2.BChangeStream 0.15]
    select 'condition 1 alarm' 作为消息
    插入警报流;

上面的siddhi ql会在window时间内检测到6个参数的变化吗?

除了您遗漏的一些小问题外,您提供的查询的要点是正确的。当你说 6 个参数时,我相信你有类似于参数 A、B、C、D、E 和 F 的东西。最终你似乎想要找到一个匹配给定条件 [1] 的事件序列。

考虑到只有2个参数A和B,你可以用Siddhi语言写查询如下,达到你的要求。


@Import('input:1.0.0')
define stream inputStream (parameter string, value double, timestamp long);    


from inputStream[parameter=='A']
select *
insert into Astream;    

from inputStream[parameter=='B']
select *
insert into Bstream;    

from Astream#timeseries:lengthTimeRegress(60000, 10000, value, timestamp)
select beta1*100 as AChange 
insert into AChangeStream;    

from Bstream#timeseries:lengthTimeRegress(60000, 10000, value, timestamp)
select beta1*100 as BChange 
insert into BChangeStream;    

from every e1=AChangeStream[e1.AChange > 0.5], e2=BChangeStream[e2.BChange > 0.15]
select 'condition 1 alarm' as message
insert into alertStream;


请注意以下事项。

  1. 在lengthTimeRegress函数中,您需要提供4个强制参数,如[2]中指定的。您在编写的查询中错过了批量大小。用于回归计算的最大事件数由批量大小指定。
  2. 在顺序条件中,需要使用一个参数。不是流名称。你误写的 e1.AChangeStream > 0.5 必须改成 e1.AChange > 0.5

[1] https://docs.wso2.com/display/CEP420/SiddhiQL+Guide+3.1#SiddhiQLGuide3.1-Sequence
[2] https://docs.wso2.com/display/SIDDHIEXTENSIONS/Regression