基于时间的 siddhi 数据聚合
siddhi data aggrigation based on time
我正在尝试根据时间 windows 聚合传感器数据,并在达到 30 秒 window 后将其写入 Cassandra(汇总)。
例如,名为 "temp" 的传感器在 30 秒内发送 3 个读数。我想获取该传感器最近 30 秒的平均值,并在 window 完成时将平均值写入 Cassandra。
这是我的代码
BasicConfigurator.configure();
// Create Siddhi Application
String siddhiApp = "define stream SensorEventStream (sensorid string, value double); " +
" " +
"@info(name = 'query1') " +
"from SensorEventStream#window.time(30 sec) " +
"select sensorid, avg(value) as value " +
"group by sensorid " +
"insert into AggregateSensorEventStream ;";
// Creating Siddhi Manager
SiddhiManager siddhiManager = new SiddhiManager();
//Generating runtime
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
//Adding callback to retrieve output events from query
siddhiAppRuntime.addCallback("AggregateSensorEventStream", new StreamCallback() {
@Override
public void receive(org.wso2.siddhi.core.event.Event[] events) {
EventPrinter.print(events);
}
});
//Retrieving input handler to push events into Siddhi
InputHandler inputHandler = siddhiAppRuntime.getInputHandler("SensorEventStream");
//Starting event processing
siddhiAppRuntime.start();
//Sending events to Siddhi
inputHandler.send(new Object[]{"Temp", 26d});
Thread.sleep(1000);
inputHandler.send(new Object[]{"Temp", 25d});
Thread.sleep(1000);
inputHandler.send(new Object[]{"Temp", 24d});
Thread.sleep(60000);
inputHandler.send(new Object[]{"Temp", 23d});
//Shutting down the runtime
siddhiAppRuntime.shutdown();
//Shutting down Siddhi
siddhiManager.shutdown();
输出是这样的
0 [main] INFO org.wso2.siddhi.core.util.EventPrinter - [Event{timestamp=1552281656960, data=[Temp, 26.0], isExpired=false}]
1002 [main] INFO org.wso2.siddhi.core.util.EventPrinter - [Event{timestamp=1552281657971, data=[Temp, 25.5], isExpired=false}]
2003 [main] INFO org.wso2.siddhi.core.util.EventPrinter - [Event{timestamp=1552281658972, data=[Temp, 25.0], isExpired=false}]
62004 [main] INFO org.wso2.siddhi.core.util.EventPrinter - [Event{timestamp=1552281718972, data=[Temp, 23.0], isExpired=false}]
从这个演示代码中我看到它立即发送 3 个事件的第一个平均温度,并且在 30 秒后 window,它没有做任何事情。然后打印 23.
如何在 window 汇总 30 秒后收到通知?我认为这就是接收函数的作用。
我不确定我是否误解了这里的功能。这对 siddi 来说可能吗?
这是预期的行为,window 是滑动 window。在这里,当第一个事件到来时,第一秒,window 仅保留第一个事件,因此平均值为 26。然后当第二个事件到来时,window 具有 26d 和 25d,然后平均在25.5。同样,第 3 秒平均 25d。然后在第 31、32 和 33 秒,这些事件将从 window 起过期。因此,当您的第 4 个事件到来时(第 63 秒),window 中只有最新的事件,因此平均值将是该值本身。 window 根据事件发生前最后 30 秒内接收到的事件计算事件到达后的平均值。
从你的问题来看,你似乎想要 timeBatch window。在这里,平均值仅在批次结束时计算。例如,在这种情况下,第 30、60、90 秒等等。请参阅 timeBatch 示例文档。
我正在尝试根据时间 windows 聚合传感器数据,并在达到 30 秒 window 后将其写入 Cassandra(汇总)。
例如,名为 "temp" 的传感器在 30 秒内发送 3 个读数。我想获取该传感器最近 30 秒的平均值,并在 window 完成时将平均值写入 Cassandra。
这是我的代码
BasicConfigurator.configure();
// Create Siddhi Application
String siddhiApp = "define stream SensorEventStream (sensorid string, value double); " +
" " +
"@info(name = 'query1') " +
"from SensorEventStream#window.time(30 sec) " +
"select sensorid, avg(value) as value " +
"group by sensorid " +
"insert into AggregateSensorEventStream ;";
// Creating Siddhi Manager
SiddhiManager siddhiManager = new SiddhiManager();
//Generating runtime
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
//Adding callback to retrieve output events from query
siddhiAppRuntime.addCallback("AggregateSensorEventStream", new StreamCallback() {
@Override
public void receive(org.wso2.siddhi.core.event.Event[] events) {
EventPrinter.print(events);
}
});
//Retrieving input handler to push events into Siddhi
InputHandler inputHandler = siddhiAppRuntime.getInputHandler("SensorEventStream");
//Starting event processing
siddhiAppRuntime.start();
//Sending events to Siddhi
inputHandler.send(new Object[]{"Temp", 26d});
Thread.sleep(1000);
inputHandler.send(new Object[]{"Temp", 25d});
Thread.sleep(1000);
inputHandler.send(new Object[]{"Temp", 24d});
Thread.sleep(60000);
inputHandler.send(new Object[]{"Temp", 23d});
//Shutting down the runtime
siddhiAppRuntime.shutdown();
//Shutting down Siddhi
siddhiManager.shutdown();
输出是这样的
0 [main] INFO org.wso2.siddhi.core.util.EventPrinter - [Event{timestamp=1552281656960, data=[Temp, 26.0], isExpired=false}]
1002 [main] INFO org.wso2.siddhi.core.util.EventPrinter - [Event{timestamp=1552281657971, data=[Temp, 25.5], isExpired=false}]
2003 [main] INFO org.wso2.siddhi.core.util.EventPrinter - [Event{timestamp=1552281658972, data=[Temp, 25.0], isExpired=false}]
62004 [main] INFO org.wso2.siddhi.core.util.EventPrinter - [Event{timestamp=1552281718972, data=[Temp, 23.0], isExpired=false}]
从这个演示代码中我看到它立即发送 3 个事件的第一个平均温度,并且在 30 秒后 window,它没有做任何事情。然后打印 23.
如何在 window 汇总 30 秒后收到通知?我认为这就是接收函数的作用。
我不确定我是否误解了这里的功能。这对 siddi 来说可能吗?
这是预期的行为,window 是滑动 window。在这里,当第一个事件到来时,第一秒,window 仅保留第一个事件,因此平均值为 26。然后当第二个事件到来时,window 具有 26d 和 25d,然后平均在25.5。同样,第 3 秒平均 25d。然后在第 31、32 和 33 秒,这些事件将从 window 起过期。因此,当您的第 4 个事件到来时(第 63 秒),window 中只有最新的事件,因此平均值将是该值本身。 window 根据事件发生前最后 30 秒内接收到的事件计算事件到达后的平均值。
从你的问题来看,你似乎想要 timeBatch window。在这里,平均值仅在批次结束时计算。例如,在这种情况下,第 30、60、90 秒等等。请参阅 timeBatch 示例文档。