WSO2 DAS SiddhiQL:初始化事件 table 并在之后更新

WSO2 DAS SiddhiQL: Initialize an event table and update it after

我有一些相关问题:

  1. 首先,我想用默认值和 100 行初始化一个事件 table,如下图所示:

  1. 其次,初始化完成后我想更新这个table。 query1 执行完成后,如何在同一个执行计划中执行 query2?

  2. 最后,我有一个具有 'altitude' 属性的事件。在我的执行计划中,对于每个事件,我想在我的事件 table 的每一行中递增 count1,其中 num 列小于高度。我试过了,但这并没有增加所有行的计数。

    FROM inputStream JOIN counterTable
    SELECT count1+1 as count1, altitude as tempNum
    update counterTable on counterTable.count1 < tempNum;
    
    FROM inputStream JOIN counterTable
    SELECT counterTable.num as theAltitude, counterTable.count1 as countAltitude
    INSERT INTO outputStream;
    
  1. 如果要在每次部署执行计划时都进行初始化,则应使用内存事件table(如下所示)。否则,您可以简单地使用已经初始化的 RDBMS event table

  2. 查询将 运行 按照它们定义的顺序进行,但该过程将针对每个事件发生(不是作为批处理,即如果有两个查询消耗 inputStream,当事件 1 进入 inputStream 时,它会去查询 1,然后去查询 2,然后只有事件 2 会被消耗..)。

  3. 参考下面的代码片段;

    /* Define the trigger to be used with initialization */
    define trigger triggerStream at 'start';
    
    /* Define streams */
    define stream inputStream (altitude int);
    define stream outputStream (theAltitude long, countAltitude int);
    
    /* Define table */
    define table counterTable (num long, count1 int, count2 int, tempNum int);
    
    /* Iterate and generate 100 events */
    from triggerStream[count() < 100]
    insert into triggerStream;
    
    /* Using above 100 events, initialize the event table */
    from triggerStream
    select count() as num, 0 as count1, 0 as count2, 0 as tempNum
    insert into counterTable;
    
    /* Perform the update logic here */
    from inputStream as i join counterTable as c
        on c.count1 < i.altitude
    select c.num, (c.count1 + 1) as count1, c.count2, altitude as tempNum
    insert into updateStream;
    
    from updateStream
    insert overwrite counterTable
        on counterTable.num == num;
    
    /* Join the table and get the updated results */
    from inputStream join counterTable as c
    select c.num as theAltitude, c.count1 as countAltitude
    insert into outputStream;
    

Table值可以初始化如下。

@info(name='initialize table values')
from inputStream[count()==1]
select 1 as id, 0 as counter1, 0 as counter2, 0 as counter3
insert into counterTable;