如何在 AWS 中有效地聚合数十亿条单独记录中的数据?

How to efficiently aggregate data in billions of individual records in AWS?

在高/理论水平上,我确切地知道我想要构建的架构类型及其工作方式,但我试图使用 AWS 服务尽可能便宜地构建它并且我不熟悉AWS 的产品让我 运行 转了一圈。

数据

我们运行 一个视频流媒体平台。在繁忙的夜晚,我们有大约 100 个同步直播流,观众超过 30,000 人。我们预计这个数字在未来几年内会上升到 100,000。一场直播平均持续 2 小时。

我们每 10 秒从播放器发送一次心跳,其中包含有关观看者的信息——他们观看了多少数据、缓冲了多少数据、流式传输的质量等。

这些心跳直接发送到 AWS Kinesis 端点。

最后,我们希望将所有过去的消息保留至少 5 年(希望更长),以便我们可以查看历史分析。

一些粗略计算表明,五年后我们将有 0.1 * 60 * 60 * 2 * 100000 * 365 * 5 = 131 十亿 条心跳消息。

我们的旧管道

我们的旧系统只有一个 Kinesis 消费者。聚合数据存储在 DynamoDB 中。每当消息到达时,我们都会从 DynamoDB 中读取记录,更新记录,然后将新记录写回。这个读取-更新-写入循环限制了我们处理消息的速度,并使得每条传入的消息都依赖于它之前的消息,因此它们无法并行处理。

此设置的部分原因是我们的消息架构从一开始就没有很好地设计。我们发送发送消息的时间戳,但我们不发送“自上次心跳以来观看的视频量”。因此,为了计算总观看时间,我们需要查找该播放器发送的最后一条心跳消息,减去时间戳,然后添加 that 值。许多其他指标也存在类似问题。

我们的新管道

我们已经开始 运行 解决缩放问题。在我们的高峰时段,在等待处理积压的消息时,分析可能会延迟多达四个小时。如果此积压达到 24 小时,Kinesis 将开始删除数据。所以我们需要修复我们的管道以消除对过去消息的依赖,以便我们可以并行处理它们。

第一部分是更新玩家发送的消息。我们的新规范仅包含可以简单求和而无需减法的指标。因此,我们可以继续添加“查看时间”指标,例如,而不考虑过去的消息。

第二部分是确保 Kinesis 永不备份。我们在未经处理的情况下尽快将原始消息转储到 S3 (Kinesis Data Fire Hose),以便我们可以在闲暇时对它们进行 运行ch 分析。

最后,我们现在想要尽快从这些分析中实际提取信息。这是我遇到障碍的地方。

我们想回答的问题

由于这是一个分析管道,我们的问题主要围绕过滤这些消息,然后聚合剩余消息的字段(可能,实际上可能是分组)。例如:

How many Android users watched last night's stream in HD? (FILTER by stream and OS)

What's the average bandwidth usage among all users? (SUM and COUNT, with later division of the final aggregates which could be done on the dashboard side)

What percent of users last year were on any Apple device (iOS, tvOS, etc)? (COUNT, grouped by OS)

What's the average time spent buffering among Android users for streams in the past year? (a mix of all of the above)

选项

理想架构

这是一个大数据问题。大数据问题的通用解决方案是“不要将数据用于查询,将查询用于数据”。如果这些消息分布在 100 个小型存储节点上,那么每个节点都可以过滤、求和和计算它们持有的数据子集,并将这些聚合传递回中央节点,中央节点对总和和计数求和。如果每个节点只对数据集的 1/100 进行操作,那么这种处理在理论上可能会非常快。

我的困惑

虽然我对“理想”架构有理论上的理解,但我不清楚 AWS 是否以这种方式工作或如何构建一个像这样运行良好的系统。

最后,除了希望获得尽可能“实时”的分析结果(理想情况下,我们希望在 1 分钟内知道有人加入或离开流),我们希望仪表板能够快速加载。等待 30 秒才能看到现场观众的数量是可怕的。仪表板应在 2 秒或更短时间内加载(理想情况下)

计划是使用 QuickSight 创建仪表板(我们的旧系统有一个 hack-y Django 应用程序可以从我们的 DynamoDB 聚合中读取 table,但我想避免为人们创建更多代码维护)

我希望您能从广泛的专家那里得到很多不同的答案和意见。由于存在很多变数,因此可能没有单一的最佳答案。让我根据我在该领域的经验给你最好的建议。

Kinesis 到 S3 是一个好的开始,不移动超过需要的数据是正确的理念。

您没有提到 Kinesis Data Analytics,这可能是满足您某些需求的解决方案。最适合询问有关数据馈送中正在发生的事情的问题。较长时间范围的问题更适合您提到的工具。如果您对过去 10 分钟(左右)发生的事情不太感兴趣,最好忽略。

S3 组织将是直接对那里的数据执行任何分析的关键。您提到镶木地板格式很好,但分区功能更强大。将 S3 数据组织成“天”或“小时”的数据,并据此设置分区可以大大加快任何受限于所需时间的查询(不要读取不需要的内容).

关于 S3 的重要安全说明 - S3 是一个对象存储,因此您引用的每个对象都有开销。无论您采用何种解决方案,将许多小对象(10,000 多个)视为一组数据都会很慢。在继续使用任何解决方案之前,您需要解决此问题。您会看到在 S3 中查找对象需要超过 0.5 秒,但如果文件很小,传输时间几乎为零。现在将 0.5 秒乘以你拥有的所有对象,看看读取它们需要多长时间。这不是您选择的下游工具的功能,而是您拥有的 S3 组织的功能。作为大数据解决方案一部分的 S3 对象的大小至少应为 100M,以免对象查找时间受到很大影响。 parquet 或 CSV 文件的选择是静音的,没有首先解决对象大小和分区问题。

Athena 适合偶尔查询,尤其是在日期范围有限的情况下。这是您期望的查询模式吗?正如您所说的“将计算移至数据”,但如果您使用 Athena 进行需要使用大部分数据的大型横截面分析,则每次执行此查询时,您只是将数据移至 Athena。不要在数据存储时停止考虑数据移动 - 还要考虑数据移动以进行分析。

所以一个大问题是需要多少数据以及支持分析工作负载和 BI 功能的频率?这就是您要寻找的最终结果。如果经常需要很大比例的数据,那么像 Redshift 这样将数据加载到磁盘的仓库解决方案是正确的答案。 Redshift 的数据加载时间非常快,因为它从 S3 并行加载数据(您看到 S3 是一个集群,Redshift 是一个集群,可以完成并行加载)。如果将所有数据加载到 Redshift 中是您所需要的,那么加载时间不是您主要关心的问题 - 成本才是。功能强大的工具,价格相匹配。新的 RA3 实例类型大大降低了大数据集群的曲线,因此有可能。

您没有提到的另一个工具是 Redshift Spectrum。这将几种对您很重要的强大技术结合在一起。首先是 Redshift 的强大功能,它能够选择通常用于您的数据大小的较小集群大小。 S3 过滤和聚合技术允许 Spectrum 对 S3 中的数据执行操作(是的,查询的初始计算操作在 S3 内部执行,可能会大大减少移动到 Redshift 的数据)。如果您的查询模式支持 S3 中的这种数据缩减,那么数据移动将很小,Redshift 集群也可以很小(便宜)。对于像您这样的 IoT 解决方案,这可能是一个强大的折衷点,因为不需要复杂的数据模型和连接。

您提出胶合和转换为镶木地板。这样做可能很好,但正如我之前提到的,在 S3 中对数据进行分区通常要强大得多。 parquet 的价值会随着数据宽度的增加而增加。 Parquet 是一种柱状格式,因此如果只需要“列”的一个子集,它会很有优势。缺点是转换 time/cost 和人类可读性的丧失(在调试期间可能会很大)。

EMR 是您提到的另一种选择,但我通常建议客户不要使用 EMR,除非他们需要它为分析带来的灵活性并且他们有能力很好地使用它。没有这些 EMR 往往是一个不必要的成本沉没。

如果这真的是一个大数据解决方案,那么 RDS(和 Aurora)不是好的选择。它们专为事务性工作负载而不是分析而设计。数据大小和分析将不适合或不符合成本效益。

space 中的另一个工具是 S3 Select。不太可能是您正在寻找的东西,但需要记住的东西存在并且可以成为工具箱中的工具。

如果存在基于某些因素的可变需求,混合解决方案在此 space 中很常见。一个常见的“是一天中的时间”- 没有人在凌晨 3 点 运行 发布大量报告,因此所需的性能要少得多。另一个是用户组——一些组需要简单的分析,而另一些则需要更强大的功能。另一个因素是数据的及时性——每个人都需要“秒级”信息还是每天的信息就足够了?试图拥有一个可以为每个人做所有事情的工具,通常是通往昂贵、超大解决方案的途径。

由于 Redshift Spectrum 和 Athena 可以指向相同的 S3 数据(组织良好,因为两者都会受益),这两种工具可以在相同的数据上共存。此外,Redshift 非常适合筛选大量数据,非常适合生成汇总表,然后将它们(在分区镶木地板中)写入 S3 以供 Athena 等工具使用。所有这些云服务都可以 运行 按计划进行,这包括 Redshift 和 EMR(Athena 是按需查询),因此它们不需要一直 运行。 Redshift with Spectrum 可以每天 运行 几个小时来执行深度分析和汇总数据以写入 S3。您的数据科学家也可以将 Redshift 用于他们的核心工作,而 Athena 支持使用每日摘要数据和 Kinesis Data Analytics 作为源的仪表板。

最后,您对仪表板提出了 2 秒的要求。这对于由 Redshift 或 Athena 支持的 Quicksight 来说绝对是可能的,但对于任意复杂/数据密集型查询来说是无法满足的。为此,您需要引擎具有足够的马力来生成相关数据。具有本地数据存储的 Redshift 可能是最快的(Redshift Spectrum 在某些情况下在 S3 中完成一些数据 p运行ing 胜出)而 Athena 是最弱/最慢的。但是,如果工作量小,功能并不重要 - 查看您的查询工作量将是一个巨大的决定因素。最快的方法是将所需数据加载到 Quicksight 存储 (SPICE) 中,但这是数据的另一个本地化/汇总版本,因此及时性再次成为一个因素(更新频率)。

基于设计类似的系统以及对您的需求的一系列猜测,我建议您:

  1. 修复您的对象大小(可以配置 Kineses 来执行此操作)
  2. 按天对数据进行分区
  3. 设置一个小型 Redshift 集群 (4 X dc2.large) 并使用 Spectrum 源寻址数据
  4. 将 Quicksight 连接到 Redshift
  5. 衡量性能(和成本)并与要求进行比较(可能会有差距)
  6. 调整解决方案(S3、Athena、SPICE 等的汇总表)以实现目标

另一种方法是聘请以前设置过此类系统的人,让他们详细审查要求并提出较少的“基于猜测”的建议。

我会研究德鲁伊。不是 AWS 产品,但可以在 AWS 上轻松运行,与 S3 和 Kinesis 良好集成。

  1. 能够高速从 Kinesis 读取数据,并立即提供数据以供查询。还可以在读取数据时展平和转换数据。
  2. 能够在摄取期间执行 rollups/aggregation/compaction(并以异步方式进一步减少数据)。从你写的内容来看,在我看来,它可以很容易地将数据库中的行数减少一个非常大的因素。
  3. 能够快速查询,使用标准 SQL。
  4. 智能分区数据以仅扫描相关日期。

缺点是您需要保持集群正常运行,运行 用于摄取和查询。它具有很好的可扩展性,因此您可以从小处着手。 从好的方面来说——您没有使用 10 种不同的技术 (Athena/Glue/EMR/etc.)

您可能需要考虑联系 Imply,这可以简化部署。

许多公司通常采用的方法是他们在 athena 或 bigquery(或其他一些分布式 sql 环境)中进行繁重的工作 -> 将中间结果聚合到多个索引+分区 postgres/mysql/redshift/clickhouse tables,然后连接他们的 API 以读取那些 tables。当然,这很好用,除了随着中间聚合数据量的增加,table 索引增长并且累积和或排序等问题变得越来越低效。

鉴于您手头的问题,我认为您可以通过 AWS Lambda 获得很多帮助。 AWS Lambda 提供了一种非常可行的无服务器方法来解决大粒度数据问题(如果使用得当)。例如,假设您的管道按 YYYYMMMDDHHMM 对传入流进行分区,并将其存储到某个 S3 路径中,该路径有一个 Lambda 监听它(作为触发函数),那么您的数据摄取 + 聚合几乎变成了同时进行的过程。一分钟后,同一 Lambda 函数的新实例将负责将数据登陆分区 YYYYMMMDDHHMM+1。因此,通过这种方式,您可以 运行 数以千计的并发进程与大量 Lambda 函数并行执行相同的操作。当然,这是一张粗略的图,但我认为它可以提供很大的帮助。