处理数据流的日志记录

Handling logging over data stream

我有一个 EC2 实例,它的数据流使用事件发射器来处理数据。例如。

stream.on('new event', function doSomething(event){ do more stuff...})

这个数据流每秒可能有数万个事件,我想以高效的方式记录这些事件的处理。换句话说,每次有新事件发生时,我都不愿意发送日志条目。

因此,我想我应该批量发送日志。例如

let logArray = [];
function sendToLogs(logs) {\** send stuff *\}

stream.on('new event', function doSomething(event){ 
  \do some stuff

  logArray.push({newLog: event})
  if (logArray.length >= 500) {
     sendToLogs(logArray)
     logArray = [];
  }
})

但是,我担心有这么多事件同时进入,上面的代码可能会导致不稳定的行为。我在本地日志记录中看到了这一点:这个数组的长度非常显着地跳跃并且可以同时对不同的事件具有相同的值。

此外,使用 cloudwatch 日志需要我在对日志记录函数的不同调用之间传递 'sequenceTokens'。如果两个事件同时触发日志记录条件,事情就会变得很奇怪。 (即使我分别记录每个事件也会存在这个问题。)

我应该如何处理这种数据流的日志记录?

我会将日志记录分离到一个或多个单独的进程中。您的主应用程序将使用 "fire and forget" 类型的逻辑将日志记录消息放在 SQS 队列中。然后,您的日志记录应用程序将读取队列并写入您选择的日志。优点是 activity 的突发会被队列吸收。队列的长度没有直接限制,因此它应该能够处理。实际上,您不再排队消息,SQS 是。

此外,如果队列增长超过您的预期,您可以使用多个日志记录应用程序来处理负载。

缺点是:

  1. 您需要编写这个单独的进程来处理到 CloudWatch 或任何地方的日志记录。
  2. 您的日志将不是实时的。在您的主应用程序记录日志和将日志消息放入 CloudWatch 之间至少会有一些延迟。使用额外的日志记录过程,您应该关闭,但这不是保证。