使用上下文在 60 分钟内聚合值时 esper 中的内存使用过多

Excessive memory usage in esper while aggregating values over 60 minutes using a context

我想汇总一天中每个小时的传感器值。时间信息来自事件流中的时间戳。为此,我创建了四个 EPL 语句:

  1. 第一个语句声明了一个上下文,该上下文按 sensorId
  2. 对事件进行分区
  3. 第二个语句检测特定传感器的事件流中一天中的小时数何时发生变化,并在发生这种情况时发送一个事件
  4. 第三条语句是上下文声明,它按 sensorId 和 hourOfDay 对事件进行分区。上下文的生命周期由第二个语句发布的事件控制。
  5. 最后一条语句使用语句 #3 中创建的上下文并计算传感器值的总和。

我每 2-6 秒从大约 300 个传感器接收事件。我将 esper 6.1.0 嵌入到堆大小为 8 GB 的 java 应用程序中。大约 15 分钟后,内存压力大到垃圾收集超速运转,导致应用程序无法使用。如果我删除最后一个 EPL 语句,我的应用程序将再次正常运行。

我对这种行为感到有些困惑。我以为当使用上下文时,esper不会在内存中累积事件。

所以这是我的问题:使用 EPL,我如何执行简单的聚合(例如求和、平均),使得内存消耗为 O(n) 而不是 O(n*t),其中 n 是传感器的数量,t 是时间长度window?

为了具体起见,这是我的 EPL 声明。

语句 1:

create context ctxPartitionById partition by sensorId from SensorEvent

声明 2:

context ctxPartitionById
INSERT INTO HourChanged
SELECT
  E.sensorId as sensorId,
  prev(1, E.occurredAtHour) AS lastHour,
  E.occurredAtHour AS currentHour
FROM SensorEvent#length(2) E
WHERE E.occurredAtHour != prev(1, E.occurredAtHour)

语句 3:

create context ctxPartitionByIdAndHour
  context PartitionedByKeys
    partition by
      sensorId, currentHour from HourChanged,
      sensorId, occurredAtHour from SensorEvent,
  context InitiateAndTerm
    initiated by HourChanged as e1
    terminated by HourChanged(sensorId=e1.sensorId and lastHour=e1.currentHour)

语句 4:

context ctxPartitionByIdAndHour
SELECT
  E.sensorId,
  E.occurredAtHour,
  SUM(E.value) AS sensorValueSum
FROM SensorEvent E
output last when terminated

sensorId 是 java class 的实例,它实现了 Comparable.

occurredAtHour 是一个 java.util.Date 实例。它的值是一个向下舍入到小时的时间戳。

valuedouble.

键控上下文不会忘记分区。所以你这样做 "partition by sensorId, currentHour from HourChanged, sensorId, occurredAtHour from SensorEvent" 并且因为这是一个键控上下文(在顶部)分区键保持不变,因为引擎认为你总是在寻找同一个键的下一个 HourChanged 事件。这意味着引擎会记住 "sensorId" 和 "currentHour" 的每个可能组合。所以这可能不是你想要的。相关提示位于解决方案模式页面。

对于语句 4,输出速率限制会导致事件保留在内存中,因为默认情况下引擎会在终止时计算输出。请注意您的 select 子句如何 selects 事件属性。当您 select 事件属性时,这些来自实际事件。相反,您可以简单地使其成为一个完全聚合的查询,这样它就没有理由保留任何事件。

context ctxPartitionByIdAndHour
SELECT last(E.sensorId), last(E.occurredAtHour), SUM(E.value) AS sensorValueSum
FROM SensorEvent E
output last when terminated

您可以在解决方案模式中做类似的事情。 http://espertech.com/esper/solution_patterns.php#absence-11 http://espertech.com/esper/solution_patterns.php#semantic-window-2

@Hint('enable_outputlimit_opt') 添加到第四个也是最后一个 EPL 语句可以解决问题。

显然,当使用 output last 时,esper 会在内存中保留事件,除非存在此特定提示。请参阅 esper 文档中的 5.7.3. Runtime Considerations