使用 WSO2 CEP 更新 MySQL 数据库

Update MySQL Database using WSO2 CEP

有没有什么方法可以在不更新流中的空值的情况下更新 MySQL Db。如果我的输入数据流包含一些空值,目前该空值使用 "data_empty" 值表示。那时 CEP 使用该值更新数据库 ("data_empty")。我的目标是在不更新空值的情况下更新其余内容。是否可以使用 siddhi 和 WSO2 CEP。

@Plan:name('DBUpdateExecutionPlan')
@Import('testStream:1.0.0')
define stream input (id string, param1 string, param2 string);

@Export('testOutStream:1.0.0')
define stream output (id string, param1 string, param2 string);

@from(eventtable = 'rdbms' , datasource.name = 'MYSQL' , table.name = 'cep')
define table cepTable (id string, param1 string, param2 string) ;

from input#window.time(0 sec)
select * 
update cepTable on id == cepTable.id;

仅将非空值(在您的方案中不是 'data_empty')更新到数据库有点困难。但是在 Siddhi 中,有一个名为 ifThenElse(condition, value if true, value if false) 的函数,可以在您的场景中使用。请参阅下面的示例执行计划,了解如何使用 ifThenElse() 和 table 更新(类似于您的用例)。

@Plan:name('IfThenElseExecutionPlan')

@Import('inputStream:1.0.0')
define stream dataIn (roomId int, roomType string, roomTemp float);

@Export('outputStream:1.0.0')
define stream dataOut (roomId int, roomType string, roomTemp float);

@From(eventtable='rdbms', datasource.name='cepdatabase', table.name='roomTable')
define table roomTable (roomId int, roomType string, roomTemp float);

from dataIn[not((roomTable.roomId == roomId) in roomTable)]
insert into updateStream;

from dataIn join roomTable
on roomTable.roomId == dataIn.roomId
select  dataIn.roomId as roomId, 
        ifThenElse(dataIn.roomType=='data_empty', roomTable.roomType, dataIn.roomType) as roomType, 
        ifThenElse(dataIn.roomTemp==0.0f, roomTable.roomTemp, dataIn.roomTemp) as roomTemp
insert into updateStream;

from updateStream
insert overwrite roomTable
on roomTable.roomId == roomId;