Esper 中的会话 Windows(上下文)
Session Windows (Context) in Esper
我正在寻找一种在动态 window 中对 Esper 事件进行分组或 window 的方法,类似于 Apache Flink 调用的 Session Windows(见下文)
我想为每个会话创建一个 Contex,但到目前为止还无法完成捕获会话的方法。初始(无效)示例;
CREATE SCHEMA EventX AS (sensorId string, timestamp long, value double);
CREATE SCHEMA SessionEvent AS (sensorId string, totalValue double, events EventX[]);
-- Unsure about Context definition
CREATE CONTEXT sensorSessionCtx
CONTEXT sensorCtx PARTITION BY sensorId FROM EventX,
CONTEXT sessionCtx INITITATED BY Eventx TERMINATED BY pattern [every EventX -> (timer:interval(3 sec) and not EventX)];
CONTEXT sensorSessionCtx
INSERT INTO SessionEvent
SELECT sensorId
, SUM(value) AS totalValue
, window(*) AS events
FROM EventX#keepall
OUTPUT LAST WHEN TERMNATED;
@Name('Sessions') SELECT * FROM SessionEvent;
的一些测试数据
EventX = {sensorId='A', timestamp=0, value=1.1}
t=t.plus(1 seconds)
EventX = {sensorId='A', timestamp=1000, value=3.2}
t=t.plus(1 seconds)
EventX = {sensorId='B', timestamp=2000, value=8.4}
t=t.plus(1 seconds)
EventX = {sensorId='A', timestamp=3000, value=2.7}
EventX = {sensorId='B', timestamp=3000, value=0.2}
t=t.plus(3 seconds)
EventX = {sensorId='A', timestamp=6000, value=3.1}
预期输出;
SessionEvent={sensorId='A', totalValue=7.0, events={[EventX={sensorId='A', timestamp=0, value=1.1},EventX={sensorId='A', timestamp=1000, value=3.2},EventX={sensorId='A', timestamp=3000, value=2.7}]}}
SessionEvent={sensorId='B', totalValue=8.6, events={[EventX={sensorId='B', timestamp=2000, value=8.4},EventX={sensorId='B', timestamp=3000, value=0.2}]}}
SessionEvent={sensorId='A', totalValue=3.1, events={[EventX={sensorId='A', timestamp=6000, value=3.1}]}}
如何在 Esper 中创建(动态)会话 windows(或上下文)?
也可以这样做。
CREATE CONTEXT sensorSessionCtx
initiated by distinct(sensorId) EventX as ex
terminated by pattern [every (timer:interval(3 sec) and not EventX(sensorId=ex.sensorId))];
CONTEXT sensorSessionCtx
SELECT sensorId
, SUM(value) AS totalValue
, window(*) AS events
FROM EventX(sensorId=context.ex.sensorId)#keepall
OUTPUT WHEN TERMiNATED;
这现在忘记了 "sensorId" 值,如果您的 space sensorId 值是无穷无尽的,这将很有用。否则,如果您有 "partition by sensorId",这意味着引擎会始终跟踪所有 sensorId。
不需要 "select * from SessionEvent" 所以我把它留了下来。
如果输出不需要包含您删除#keepall 的事件,即CONTEXT sensorSessionCtx SELECT sensorId, sum(value) AS totalValue FROM EventX(sensorId=context.ex.sensorId) OUTPUT SNAPSHOT WHEN TERMINATED;
编辑:
应该是 "every (timer:interval(3 sec) and not EventX(sensorId=ex.sensorId))" 在终止。之前的建议是错误的,因为启动事件从不计入终止事件 event/pattern(因此 EventX -> every(...) 是错误的)。
要在 Esper 中提供会话上下文,我们需要创建一个 Nested Context。
在这个嵌套上下文中,我们 Keyed Segment context is defined first, to 'split' to stream into separate user, or sensor as per the example, streams. We then define a second Non-Overlapping 上下文。第二个上下文将仅在第一个键控上下文中运行并且(因此此上下文中的所有事件都具有相同的 sensorId)。
CREATE CONTEXT sensorSessionCtx
CONTEXT sensorCtx
PARTITION BY sensorId FROM EventX
, CONTEXT sessionCtx
START EventX
END pattern [every (timer:interval(3 sec) AND NOT EventX)];
在示例中,具有此上下文定义的输出将是;
At: 2001-01-01 08:00:06.000
Insert
SessionEvent={sensorId='A', totalValue=7.000000000000001, events={[EventX={sensorId='A', timestamp=0, value=1.1},EventX={sensorId='A', timestamp=1000, value=3.2},EventX={sensorId='A', timestamp=3000, value=2.7}]}}
Insert
SessionEvent={sensorId='B', totalValue=8.6, events={[EventX={sensorId='B', timestamp=2000, value=8.4},EventX={sensorId='B', timestamp=3000, value=0.2}]}}
At: 2001-01-01 08:00:09.000
Insert
SessionEvent={sensorId='A', totalValue=3.1, events={[EventX={sensorId='A', timestamp=6000, value=3.1}]}}
虽然请注意,当内部引擎计时器已被禁用时,为了最后一个 window 永远终止,一个事件(任何类型都可以)应该被接收,其时间戳大于最后一个会话事件时间戳 + 会话间隙。在 EPL 在线工具中,这可以通过在最后定义的输入事件后添加 t=t.plus(10 seconds)
来完成。如果您想创建任何类型的单元测试
,这一点尤其重要
val finalEventTimestamp = Long.MaxValue - 1 //Note: Long.MaxValue won't trigger final evaluation!
engine.getEPRuntime.sendEvent(new CurrentTimeEvent(finalEventTimestamp))
我正在寻找一种在动态 window 中对 Esper 事件进行分组或 window 的方法,类似于 Apache Flink 调用的 Session Windows(见下文)
我想为每个会话创建一个 Contex,但到目前为止还无法完成捕获会话的方法。初始(无效)示例;
CREATE SCHEMA EventX AS (sensorId string, timestamp long, value double);
CREATE SCHEMA SessionEvent AS (sensorId string, totalValue double, events EventX[]);
-- Unsure about Context definition
CREATE CONTEXT sensorSessionCtx
CONTEXT sensorCtx PARTITION BY sensorId FROM EventX,
CONTEXT sessionCtx INITITATED BY Eventx TERMINATED BY pattern [every EventX -> (timer:interval(3 sec) and not EventX)];
CONTEXT sensorSessionCtx
INSERT INTO SessionEvent
SELECT sensorId
, SUM(value) AS totalValue
, window(*) AS events
FROM EventX#keepall
OUTPUT LAST WHEN TERMNATED;
@Name('Sessions') SELECT * FROM SessionEvent;
的一些测试数据
EventX = {sensorId='A', timestamp=0, value=1.1}
t=t.plus(1 seconds)
EventX = {sensorId='A', timestamp=1000, value=3.2}
t=t.plus(1 seconds)
EventX = {sensorId='B', timestamp=2000, value=8.4}
t=t.plus(1 seconds)
EventX = {sensorId='A', timestamp=3000, value=2.7}
EventX = {sensorId='B', timestamp=3000, value=0.2}
t=t.plus(3 seconds)
EventX = {sensorId='A', timestamp=6000, value=3.1}
预期输出;
SessionEvent={sensorId='A', totalValue=7.0, events={[EventX={sensorId='A', timestamp=0, value=1.1},EventX={sensorId='A', timestamp=1000, value=3.2},EventX={sensorId='A', timestamp=3000, value=2.7}]}}
SessionEvent={sensorId='B', totalValue=8.6, events={[EventX={sensorId='B', timestamp=2000, value=8.4},EventX={sensorId='B', timestamp=3000, value=0.2}]}}
SessionEvent={sensorId='A', totalValue=3.1, events={[EventX={sensorId='A', timestamp=6000, value=3.1}]}}
如何在 Esper 中创建(动态)会话 windows(或上下文)?
也可以这样做。
CREATE CONTEXT sensorSessionCtx
initiated by distinct(sensorId) EventX as ex
terminated by pattern [every (timer:interval(3 sec) and not EventX(sensorId=ex.sensorId))];
CONTEXT sensorSessionCtx
SELECT sensorId
, SUM(value) AS totalValue
, window(*) AS events
FROM EventX(sensorId=context.ex.sensorId)#keepall
OUTPUT WHEN TERMiNATED;
这现在忘记了 "sensorId" 值,如果您的 space sensorId 值是无穷无尽的,这将很有用。否则,如果您有 "partition by sensorId",这意味着引擎会始终跟踪所有 sensorId。
不需要 "select * from SessionEvent" 所以我把它留了下来。
如果输出不需要包含您删除#keepall 的事件,即CONTEXT sensorSessionCtx SELECT sensorId, sum(value) AS totalValue FROM EventX(sensorId=context.ex.sensorId) OUTPUT SNAPSHOT WHEN TERMINATED;
编辑: 应该是 "every (timer:interval(3 sec) and not EventX(sensorId=ex.sensorId))" 在终止。之前的建议是错误的,因为启动事件从不计入终止事件 event/pattern(因此 EventX -> every(...) 是错误的)。
要在 Esper 中提供会话上下文,我们需要创建一个 Nested Context。
在这个嵌套上下文中,我们 Keyed Segment context is defined first, to 'split' to stream into separate user, or sensor as per the example, streams. We then define a second Non-Overlapping 上下文。第二个上下文将仅在第一个键控上下文中运行并且(因此此上下文中的所有事件都具有相同的 sensorId)。
CREATE CONTEXT sensorSessionCtx
CONTEXT sensorCtx
PARTITION BY sensorId FROM EventX
, CONTEXT sessionCtx
START EventX
END pattern [every (timer:interval(3 sec) AND NOT EventX)];
在示例中,具有此上下文定义的输出将是;
At: 2001-01-01 08:00:06.000
Insert
SessionEvent={sensorId='A', totalValue=7.000000000000001, events={[EventX={sensorId='A', timestamp=0, value=1.1},EventX={sensorId='A', timestamp=1000, value=3.2},EventX={sensorId='A', timestamp=3000, value=2.7}]}}
Insert
SessionEvent={sensorId='B', totalValue=8.6, events={[EventX={sensorId='B', timestamp=2000, value=8.4},EventX={sensorId='B', timestamp=3000, value=0.2}]}}
At: 2001-01-01 08:00:09.000
Insert
SessionEvent={sensorId='A', totalValue=3.1, events={[EventX={sensorId='A', timestamp=6000, value=3.1}]}}
虽然请注意,当内部引擎计时器已被禁用时,为了最后一个 window 永远终止,一个事件(任何类型都可以)应该被接收,其时间戳大于最后一个会话事件时间戳 + 会话间隙。在 EPL 在线工具中,这可以通过在最后定义的输入事件后添加 t=t.plus(10 seconds)
来完成。如果您想创建任何类型的单元测试
val finalEventTimestamp = Long.MaxValue - 1 //Note: Long.MaxValue won't trigger final evaluation!
engine.getEPRuntime.sendEvent(new CurrentTimeEvent(finalEventTimestamp))