Stream - 将相似的事件数据合并到一个事件中

Stream - Merge similar events data into one event

我想根据其中一个字段将传入事件合并为一个事件。

输入事件:

{
  ID: '123',
  eventType: 'a',
  eventCode: 1
},
{
  ID: '123',
  eventType: 'b',
  eventCode: 2
},
{
  ID: '123',
  eventType: 'c',
  eventCode: 3
}

预期输出:

{
  ID: '123',
  events: [{
    eventType: 'a',
    eventCode: 1
  },
  {
    eventType: 'b',
    eventCode: 2
  },
  {
    eventType: 'c',
    eventCode: 3
  }]
}

我根据 window 4 对事件进行分组。因此,我需要处理这 4 个事件,合并它们并将其传递到下一步。

用例: 我想使用生成的输出存储在 MongoDB 或将其传递到外部服务。

这可以使用 Siddhi 吗?

注意:我看到 similar question 已经被问过,但回复是 5 年前的,Siddhi 从那时起已经走了很长一段路。

您可以使用以下 Siddhi 应用程序来实现您的要求。我也使用了 string extension to do this. But please note generated output is exactly the one you requested. If you want a proper JSON output you might have to utilize execution json extention。请按照自述文件了解有关扩展使用的详细信息。

@App:name("testJsonConcat")
@App:description("Description of the plan")

-- Please refer to https://docs.wso2.com/display/SP400/Quick+Start+Guide on getting started with SP editor. 

define stream inputStream(id string, eventType string, eventCode int);

partition with (id of inputStream)
begin
from inputStream
select id, str:concat("{eventType: '", eventType, "' , eventCode :",eventCode,"}") as jsonString
insert into #formattedStream;

from #formattedStream#window.lengthBatch(4)
select str:concat("{ ID : '", id, "',events: [", str:groupConcat(jsonString),"]}") as result
insert into concatStream;
end;

from concatStream#log()
select *
insert into temp;