我如何使用 siddhi 从事件的连续值中挑选出值

How can i pick out value from consecutive value of event by using siddhi

例如,即数据:

1,1470732420000,0
2,1470732421000,0
3,1470732422000,0
4,1470732423000,86
5,1470732424000,87
6,1470732425000,88
7,1470732426000,84
8,1470732427000,0
9,1470732428000,0
10,1470732429000,0
11,1470732430000,89
12,1470732431000,89
13,1470732432000,87
14,1470732433000,89
15,1470732434000,85
16,1470732435000,89
17,1470732436000,89
18,1470732437000,87
19,1470732438000,86
20,1470732439000,88
21,1470732440000,0
22,1470732441000,0
23,1470732442000,0
24,1470732443000,87
25,1470732444000,85
26,1470732445000,86
27,1470732446000,0
28,1470732447000,0
29,1470732448000,0
30,1470732449000,0

第一列是id,第二列是timestamp,第三列是value,时间戳之间间隔 1 秒。

我想监控事件的值,如果我发现值>=85(例如id=4),我将开始计数,如果接下来的两个连续值>=85(例如id=5/id= 6), 然后我将事件的第三个值放入 OutputStream.(e.g. id=6,value=88,timestamp=1470732425000)

同时清除小于85的计数和等待值(如id=7,value=84),当发现value>=85(如id=11, value=89)我会开始计数,如果接下来的两个连续值>=85(例如id=12/id=13),那么我会把事件的第三个值放到OutputStream。(例如id=13,value=87 ,时间戳=1470732432000)...

所有这些都是我想做的,在我 post 这个问题之前,我在 中得到了答案,我试过这个代码:

from every a1=InputStream[value>=85], a2=InputStream[value>=85]+, a3=InputStream[value<85]
select a2[1].id, a2[1].value
having (not (a2[1] is null))
insert into OutPutStream;

它有效,但我发现它会在值 <=85 之后将值插入到 OutputStream,我想要的是如果我得到三个连续的值 >=85 然后我立即插入值。(如果下一个值一直>=85,我不想等待) 事实上,我只想在连续三秒内记录第三秒的值 value(>=85) 。 我正在使用 wso2das-3.1.0-SNAPSHOT。

尽管 DAS (Siddhi) 支持 sequence/pattern 处理,但根据您的要求,您可能需要编写自定义扩展。我已经编写了一个示例 window 处理器扩展来满足您的要求(source code). Download and place siddhi-extension-condition-window-1.0.jar in <das_home>/repository/components/lib/ directory and restart the server. Refer to the test case 以了解扩展的用法。