为什么所有事件都是成批出现而不是一个接一个出现?
Why do all events come in bulk rather than one by one?
我正在使用外部定时 window 测试一个非常简单的 CEP 查询。查询是define stream LoginEvents (timeStamp long, ip string, phone string); @info(name = 'query1') from LoginEvents#window.externalTime(timeStamp,5 sec) select timeStamp, ip insert all events into uniqueIps;
;
查看单元测试here,我认为会发生回调将被调用 9 次,传入事件调用 5 次,过期事件调用 4 次。相反,它只被调用一次。为什么会这样,我怎样才能达到为每个事件调用回调的状态?
这里所有的事件都没有任何时间延迟地发送给 Siddhi,因此 Siddhi 一起处理所有这些事件。这就是事件作为批量返回的原因。
如果您希望为每个事件调用回调,则必须扩展 StreamCallback 或 QueryCallback 并迭代返回的事件数组并为每个事件调用回调。
这是我的实现。我不得不复制 send
方法,因为它是私有的。您可以替换我对时间戳 getter 的实现,因为它非常适合我的用例。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
import java.util.Arrays;
public abstract class CustomQueryCallback extends QueryCallback {
private static final Logger log = LoggerFactory.getLogger(CustomQueryCallback.class);
public void receiveStreamEvent(ComplexEventChunk complexEventChunk) {
while (complexEventChunk.hasNext()) {
ComplexEvent streamEvent = complexEventChunk.next();
Event event = new Event(streamEvent.getOutputData().length).copyFrom(streamEvent);
Event[] events = new Event[]{event};
long timestamp = (streamEvent.getType() == StreamEvent.Type.EXPIRED ? streamEvent.getTimestamp() : (long) streamEvent.getOutputData()[2]);
if (streamEvent.getType() == StreamEvent.Type.EXPIRED){
send(timestamp, null, events);
} else {
send(timestamp, events, null);
}
}
}
private void send(long timeStamp, Event[] currentEvents, Event[] expiredEvents) {
try {
receive(timeStamp, currentEvents, expiredEvents);
} catch (RuntimeException e) {
log.error("Error on sending events" + Arrays.deepToString(currentEvents) + ", " + Arrays.deepToString(expiredEvents), e);
}
}
}
我正在使用外部定时 window 测试一个非常简单的 CEP 查询。查询是define stream LoginEvents (timeStamp long, ip string, phone string); @info(name = 'query1') from LoginEvents#window.externalTime(timeStamp,5 sec) select timeStamp, ip insert all events into uniqueIps;
;
查看单元测试here,我认为会发生回调将被调用 9 次,传入事件调用 5 次,过期事件调用 4 次。相反,它只被调用一次。为什么会这样,我怎样才能达到为每个事件调用回调的状态?
这里所有的事件都没有任何时间延迟地发送给 Siddhi,因此 Siddhi 一起处理所有这些事件。这就是事件作为批量返回的原因。
如果您希望为每个事件调用回调,则必须扩展 StreamCallback 或 QueryCallback 并迭代返回的事件数组并为每个事件调用回调。
这是我的实现。我不得不复制 send
方法,因为它是私有的。您可以替换我对时间戳 getter 的实现,因为它非常适合我的用例。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
import java.util.Arrays;
public abstract class CustomQueryCallback extends QueryCallback {
private static final Logger log = LoggerFactory.getLogger(CustomQueryCallback.class);
public void receiveStreamEvent(ComplexEventChunk complexEventChunk) {
while (complexEventChunk.hasNext()) {
ComplexEvent streamEvent = complexEventChunk.next();
Event event = new Event(streamEvent.getOutputData().length).copyFrom(streamEvent);
Event[] events = new Event[]{event};
long timestamp = (streamEvent.getType() == StreamEvent.Type.EXPIRED ? streamEvent.getTimestamp() : (long) streamEvent.getOutputData()[2]);
if (streamEvent.getType() == StreamEvent.Type.EXPIRED){
send(timestamp, null, events);
} else {
send(timestamp, events, null);
}
}
}
private void send(long timeStamp, Event[] currentEvents, Event[] expiredEvents) {
try {
receive(timeStamp, currentEvents, expiredEvents);
} catch (RuntimeException e) {
log.error("Error on sending events" + Arrays.deepToString(currentEvents) + ", " + Arrays.deepToString(expiredEvents), e);
}
}
}