Esper 中的会话 Windows(上下文)

Session Windows (Context) in Esper

我正在寻找一种在动态 window 中对 Esper 事件进行分组或 window 的方法,类似于 Apache Flink 调用的 Session Windows(见下文)

我想为每个会话创建一个 Contex,但到目前为止还无法完成捕获会话的方法。初始(无效)示例;

CREATE SCHEMA EventX AS (sensorId string, timestamp long, value double);

CREATE SCHEMA SessionEvent AS (sensorId string, totalValue double, events EventX[]);

-- Unsure about Context definition
CREATE CONTEXT sensorSessionCtx
 CONTEXT sensorCtx PARTITION BY sensorId FROM EventX,
 CONTEXT sessionCtx INITITATED BY Eventx TERMINATED BY pattern [every EventX -> (timer:interval(3 sec) and not EventX)];

CONTEXT sensorSessionCtx
INSERT INTO SessionEvent
SELECT sensorId
,      SUM(value) AS totalValue
,      window(*)  AS events
FROM   EventX#keepall
OUTPUT LAST WHEN TERMNATED;

@Name('Sessions') SELECT * FROM   SessionEvent;

以及 Esper EPL Online

的一些测试数据
EventX = {sensorId='A', timestamp=0, value=1.1}
t=t.plus(1 seconds)
EventX = {sensorId='A', timestamp=1000, value=3.2}
t=t.plus(1 seconds)
EventX = {sensorId='B', timestamp=2000, value=8.4}
t=t.plus(1 seconds)
EventX = {sensorId='A', timestamp=3000, value=2.7}
EventX = {sensorId='B', timestamp=3000, value=0.2}
t=t.plus(3 seconds)
EventX = {sensorId='A', timestamp=6000, value=3.1}

预期输出;

SessionEvent={sensorId='A', totalValue=7.0, events={[EventX={sensorId='A', timestamp=0, value=1.1},EventX={sensorId='A', timestamp=1000, value=3.2},EventX={sensorId='A', timestamp=3000, value=2.7}]}}
SessionEvent={sensorId='B', totalValue=8.6, events={[EventX={sensorId='B', timestamp=2000, value=8.4},EventX={sensorId='B', timestamp=3000, value=0.2}]}}
SessionEvent={sensorId='A', totalValue=3.1, events={[EventX={sensorId='A', timestamp=6000, value=3.1}]}}

如何在 Esper 中创建(动态)会话 windows(或上下文)?

也可以这样做。

CREATE CONTEXT sensorSessionCtx
  initiated by distinct(sensorId) EventX as ex 
  terminated by pattern [every (timer:interval(3 sec) and not EventX(sensorId=ex.sensorId))];

CONTEXT sensorSessionCtx
SELECT sensorId
,      SUM(value) AS totalValue
,      window(*)  AS events
FROM   EventX(sensorId=context.ex.sensorId)#keepall
OUTPUT WHEN TERMiNATED;

这现在忘记了 "sensorId" 值,如果您的 space sensorId 值是无穷无尽的,这将很有用。否则,如果您有 "partition by sensorId",这意味着引擎会始终跟踪所有 sensorId。

不需要 "select * from SessionEvent" 所以我把它留了下来。

如果输出不需要包含您删除#keepall 的事件,即CONTEXT sensorSessionCtx SELECT sensorId, sum(value) AS totalValue FROM EventX(sensorId=context.ex.sensorId) OUTPUT SNAPSHOT WHEN TERMINATED;

编辑: 应该是 "every (timer:interval(3 sec) and not EventX(sensorId=ex.sensorId))" 在终止。之前的建议是错误的,因为启动事件从不计入终止事件 event/pattern(因此 EventX -> every(...) 是错误的)。

要在 Esper 中提供会话上下文,我们需要创建一个 Nested Context

在这个嵌套上下文中,我们 Keyed Segment context is defined first, to 'split' to stream into separate user, or sensor as per the example, streams. We then define a second Non-Overlapping 上下文。第二个上下文将仅在第一个键控上下文中运行并且(因此此上下文中的所有事件都具有相同的 sensorId)。

CREATE CONTEXT sensorSessionCtx
  CONTEXT sensorCtx
    PARTITION BY sensorId FROM EventX
, CONTEXT sessionCtx
    START EventX
    END pattern [every (timer:interval(3 sec) AND NOT EventX)];

在示例中,具有此上下文定义的输出将是;

At: 2001-01-01 08:00:06.000
  Insert
    SessionEvent={sensorId='A', totalValue=7.000000000000001, events={[EventX={sensorId='A', timestamp=0, value=1.1},EventX={sensorId='A', timestamp=1000, value=3.2},EventX={sensorId='A', timestamp=3000, value=2.7}]}}
  Insert
    SessionEvent={sensorId='B', totalValue=8.6, events={[EventX={sensorId='B', timestamp=2000, value=8.4},EventX={sensorId='B', timestamp=3000, value=0.2}]}}

At: 2001-01-01 08:00:09.000
  Insert
    SessionEvent={sensorId='A', totalValue=3.1, events={[EventX={sensorId='A', timestamp=6000, value=3.1}]}}

虽然请注意,当内部引擎计时器已被禁用时,为了最后一个 window 永远终止,一个事件(任何类型都可以)应该被接收,其时间戳大于最后一个会话事件时间戳 + 会话间隙。在 EPL 在线工具中,这可以通过在最后定义的输入事件后添加 t=t.plus(10 seconds) 来完成。如果您想创建任何类型的单元测试

,这一点尤其重要
val finalEventTimestamp = Long.MaxValue - 1 //Note: Long.MaxValue won't trigger final evaluation!
engine.getEPRuntime.sendEvent(new CurrentTimeEvent(finalEventTimestamp))