WSO2 Siddhi 复杂事件处理器问题

WSO2 Siddhi Complex Event Processor question

环境:WSO2 流处理器 4.3.0

假设我有两个非常简单的流:

实时传送新创建的请求(未完成)的流 (t1)

RequestStream(requestId)

请求实时完成时(t2)出现 requestIds 的流

FulfilmentStream(requestId)

保证 t2 总是 > t1

我如何实现 SiddhiQL 语句来识别出现在 RequestStream (event1) 而未出现在 FulfilmentStream (event2)[=29] 中的 requestId =] 自事件 1 开始 5 分钟后?

基于 Tishan 答案的 Working Siddhi App:

@App:name('FailedToFulfillInAmountOfTime')

@source(
type="kafka",
topic.list="some_topic",
threading.option="single.thread",
group.id="some_group",
bootstrap.servers="xxx.xxx.xxx.xxx:6667",
@Map(type="json", @attributes(request_id = '$.alarm_id', severity = '$.severity', managed_object = '$.ManagedObject')))
define stream OrigAlarmStream (request_id int, severity string, managed_object string); 

@sink(type='log', prefix='Got this execution request')
define stream RequestStream (request_id int, severity string, managed_object string);

@sink(type='log', prefix='Got this fulfillment confirmation:')
define stream FulfillmentStream (request_id int, severity string, managed_object string);

@sink(type='log', prefix='This fulfillment was not done within 1 min:')
define stream AlertStream(request_id int);

@info(name='getExpiredRequests')
from every e1=RequestStream -> not FulfillmentStream[e1.request_id == request_id] for 1 min
select e1.request_id
insert into AlertStream;

@info(name='CopyFulfillments')
from OrigAlarmStream[severity == 'Clear']
select request_id, severity, managed_object
insert into FulfillmentStream;

@info(name='CopyRequests')
from OrigAlarmStream[severity != 'Clear']
select request_id, severity, managed_object
insert into RequestStream;

您可以使用逻辑模式来实现您的要求。请参考以下查询。

from e1=RequestStream -> not e2=FulfilmentStream[e1.requestId == e2.requestId] for '5 min'
select e1.requestId as requestId
insert into AlertStream;

这里我们定义了一个没有条件的模式。这将在 RequestStream 中的事件到来时触发,并且在 5 分钟内没有事件在 5 分钟内进入 FulfilmentStream。请参阅 logical patterns 了解更多信息。