基于时间跨度的 AWS Kinesis 流聚合

AWS Kinesis Stream Aggregating Based on Time Spans

我目前有一个 Kinesis 流,其中填充了 JSON 形式的消息:

{"datetime": "2017-09-29T20:12:01.755z", "payload":"4"}
{"datetime": "2017-09-29T20:12:07.755z", "payload":"5"}
{"datetime": "2017-09-29T20:12:09.755z", "payload":"12"}
etc...

我在这里试图完成的是根据时间块聚合数据。在这种情况下,我想对 10 分钟跨度的平均值进行分组。例如,从 12:00 > 12:10,我想对 payload 值进行平均并保存为 12:10 值。

例如,上述数据将产生:

Datetime: 2017-09-29T20:12:10.00z
Average: 7

我正在考虑的方法是在服务级别使用缓存,然后使用某种方式来跟踪时间。如果消息进入下一个 10 分钟时间跨度,我会平均缓存数据,将其存储到数据库中,然后删除该缓存值。

目前,我的服务每分钟会收到 20,000 条消息,预计未来会收到更多消息。我对如何实现它以确保我从 Kinesis 获得那 10 分钟时间段内的所有值有点困惑。那些比较熟悉 Kinesis 和 AWS 的人,有没有简单的方法来解决这个问题?

这样做的原因是为了缩短对大时间跨度(例如 1 年)数据的查询时间。我不想获取数百万个值,而是一些聚合值。

编辑:

我必须同时跟踪许多不同的平均值。例如,上面的 JSON 可能只属于一个 'set',例如每个城市在 10 分钟时间跨度内的平均温度。这需要我跟踪每个时间跨度的每个城市的平均值。

Toronto (12:01 - 12:10): average_temp
New York (12:01 - 12:10): average_temp
Toronto (12:11 - 12:20): average_temp
New York (12:11 - 12:20): average_temp
etc...

这可能适用于全球任何城市。如果新温度到达多伦多,并且它与 12:01 - 12:10 时间跨度有关,我必须重新计算并存储该平均值。

这就是我要做的。感谢您提出有趣的问题。

Kinesis Streams --> Lambda(事件插入器)--> DynamoDB(流)--> Lambda(计数和值增量器)--> DynamoDB(流)--> 平均值(更新器)

DynamoDB Table 结构:

{ 
Timestamp: 1506794597
Count: 3
TotalValue: 21
Average: 7
Event{timestamp}-{guid}: { event }
}

timestamp -- timestamp of the actual event
guid -- avoid any collision on a timestamp that occurred at same time
Event{timestamp}-{guid} -- This should be removed by (count and value incrementor)

如果该时间戳的第四条记录到达,

获取接近 10 分钟的时间跨度,增加计数,增加总值。永远不要读取值和增量,这将导致错误,除非你使用强一致性(这是非常昂贵读取)。而是使用 原子增量 执行增量操作。

http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithItems.html#WorkingWithItems.AtomicCounters

从上面创建 DynamoDB 流 table,监听另一个 lambda,现在计算平均值并更新值。

计算平均值时,不要从 table 执行读取。相反,数据将在流中可用,您只需要计算平均值并更新它。 (覆盖之前的平均值)。

这将适用于任何规模且具有高可用性。

希望对您有所帮助。

编辑 1:

由于 OP 不熟悉 AWS 服务,

Lambda 文档:

https://aws.amazon.com/lambda/

DynamoDB 文档:

https://aws.amazon.com/dynamodb/

用于解决方案的 AWS 云服务。