Stream - 将相似的事件数据合并到一个事件中
Stream - Merge similar events data into one event
我想根据其中一个字段将传入事件合并为一个事件。
输入事件:
{
ID: '123',
eventType: 'a',
eventCode: 1
},
{
ID: '123',
eventType: 'b',
eventCode: 2
},
{
ID: '123',
eventType: 'c',
eventCode: 3
}
预期输出:
{
ID: '123',
events: [{
eventType: 'a',
eventCode: 1
},
{
eventType: 'b',
eventCode: 2
},
{
eventType: 'c',
eventCode: 3
}]
}
我根据 window 4 对事件进行分组。因此,我需要处理这 4 个事件,合并它们并将其传递到下一步。
用例:
我想使用生成的输出存储在 MongoDB 或将其传递到外部服务。
这可以使用 Siddhi 吗?
注意:我看到 similar question 已经被问过,但回复是 5 年前的,Siddhi 从那时起已经走了很长一段路。
您可以使用以下 Siddhi 应用程序来实现您的要求。我也使用了 string extension to do this. But please note generated output is exactly the one you requested. If you want a proper JSON output you might have to utilize execution json extention。请按照自述文件了解有关扩展使用的详细信息。
@App:name("testJsonConcat")
@App:description("Description of the plan")
-- Please refer to https://docs.wso2.com/display/SP400/Quick+Start+Guide on getting started with SP editor.
define stream inputStream(id string, eventType string, eventCode int);
partition with (id of inputStream)
begin
from inputStream
select id, str:concat("{eventType: '", eventType, "' , eventCode :",eventCode,"}") as jsonString
insert into #formattedStream;
from #formattedStream#window.lengthBatch(4)
select str:concat("{ ID : '", id, "',events: [", str:groupConcat(jsonString),"]}") as result
insert into concatStream;
end;
from concatStream#log()
select *
insert into temp;
我想根据其中一个字段将传入事件合并为一个事件。
输入事件:
{
ID: '123',
eventType: 'a',
eventCode: 1
},
{
ID: '123',
eventType: 'b',
eventCode: 2
},
{
ID: '123',
eventType: 'c',
eventCode: 3
}
预期输出:
{
ID: '123',
events: [{
eventType: 'a',
eventCode: 1
},
{
eventType: 'b',
eventCode: 2
},
{
eventType: 'c',
eventCode: 3
}]
}
我根据 window 4 对事件进行分组。因此,我需要处理这 4 个事件,合并它们并将其传递到下一步。
用例: 我想使用生成的输出存储在 MongoDB 或将其传递到外部服务。
这可以使用 Siddhi 吗?
注意:我看到 similar question 已经被问过,但回复是 5 年前的,Siddhi 从那时起已经走了很长一段路。
您可以使用以下 Siddhi 应用程序来实现您的要求。我也使用了 string extension to do this. But please note generated output is exactly the one you requested. If you want a proper JSON output you might have to utilize execution json extention。请按照自述文件了解有关扩展使用的详细信息。
@App:name("testJsonConcat")
@App:description("Description of the plan")
-- Please refer to https://docs.wso2.com/display/SP400/Quick+Start+Guide on getting started with SP editor.
define stream inputStream(id string, eventType string, eventCode int);
partition with (id of inputStream)
begin
from inputStream
select id, str:concat("{eventType: '", eventType, "' , eventCode :",eventCode,"}") as jsonString
insert into #formattedStream;
from #formattedStream#window.lengthBatch(4)
select str:concat("{ ID : '", id, "',events: [", str:groupConcat(jsonString),"]}") as result
insert into concatStream;
end;
from concatStream#log()
select *
insert into temp;