Athena 在 S3 上查询数据的替代方案

Alternatives for Athena to query the data on S3

我在 S3 上有大约 300 GB 的数据。让我们说数据看起来像:

## S3://Bucket/Country/Month/Day/1.csv 

S3://Countries/Germany/06/01/1.csv 
S3://Countries/Germany/06/01/2.csv 
S3://Countries/Germany/06/01/3.csv 

S3://Countries/Germany/06/02/1.csv 
S3://Countries/Germany/06/02/2.csv 

我们正在对数据进行一些复杂的聚合,并且由于一些国家/地区的数据大而一些国家/地区的数据小,AWS EMR 没有意义,因为小国一旦完成,资源就被浪费了,而大国则长期保留 运行 。因此,我们决定将 AWS Batch(Docker 容器)Athena 一起使用。一项工作需要每个国家/地区一天的数据。

现在大约有 1000 个作业 一起启动,当它们查询 Athena 读取数据时,容器失败,因为它们到达 Athena 查询限制

因此,我想知道解决这个问题的其他可能方法是什么?我应该使用 Redshift 集群,在那里加载所有数据并且所有容器查询到 Redshift 集群,因为它们没有查询限制。但是它很贵,而且需要很多时间来完成。

另一种选择是读取 EMR 上的数据并在其上使用 Hive 或 Presto 来查询数据,但是它将再次达到查询限制。

如果有人能提供更好的选择来解决这个问题,那就太好了。

您可以为此目的使用红移光谱。是的,它有点贵,但它是可扩展的,非常适合执行复杂的聚合。

一种解决方案是不要同时启动所有作业,而是调整它们以保持在并发限制内。我不知道这对于您使用的工具来说是容易还是困难,但如果您同时向 Athena 提出所有查询,它永远不会很好地解决问题。 编辑:看起来你应该能够限制 Batch 中的作业,参见 (默认情况下 Athena 允许 25 个并发查询,因此尝试 20 个并发作业以获得安全边际– 但也将重试逻辑添加到启动作业的代码中)。

另一种选择是不将其作为单独的查询来执行,而是尝试将所有内容合并为更少的查询,甚至是单个查询 - 通过按国家/地区和日期分组,或者生成所有查询并将它们与UNION ALL。但是,如果不了解更多关于数据和查询的信息,很难说这是否可能。无论如何,您可能必须 post 处理结果,如果您只是按有意义的内容排序,那么在查询具有 运行 之后将结果拆分为必要的部分并不是很难。

使用 Redshift 可能不是解决方案,因为听起来您每天只这样做一次,而且您不会经常使用集群。雅典娜是一个更好的选择,你只需要更好地处理限制。

由于我对您的用例的了解有限,我认为使用 Lambda 和 Step Functions 是比 Batch 更好的方法。使用 Step Functions,您将拥有一个启动 N 个查询的函数(其中 N 等于您的并发限制,如果您没有要求引发它,则为 25),然后是一个轮询循环(查看示例以了解如何这样做)检查已完成的查询,并启动新查询以将 运行ning 查询的数量保持在最大值。当所有查询都是 运行 时,最终函数可以在完成所有操作后触发您需要 运行 的任何工作流程(或者您可以 运行 在每次查询之后)。

Lambda 和 Step Functions 的好处是您无需为闲置资源付费。使用 Batch,您将为只等待 Athena 完成而什么都不做的资源付费。例如,与 Redshift 相比,Athena 具有异步 API,您可以 运行 Lambda 函数 100 毫秒开始查询,然后每隔几秒(或几分钟)检查 100 毫秒是否已完成,然后再花 100 毫秒左右完成。它几乎可以保证低于 Lambda 免费套餐。

据我所知,Redshift SpectrumAthena 成本相同。您不应该将 Redshift 与 Athena 进行比较,它们有不同的用途。但首先我会考虑解决您的数据倾斜问题。既然您提到了 AWS EMR,我假设您使用的是 Spark。要处理大分区和小分区,您需要按月或其他一些均匀分布的方式对数据集进行重新分区 value.Or 您可以使用月份和国家/地区进行分组。你明白了。

据我了解,您只需将查询发送到 AWS Athena 服务,在所有聚合步骤完成后,您只需从 Athena 保存结果的 S3 存储桶中检索生成的 csv 文件,因此您最终得到 1000 个文件(一个每项工作)。但问题是并发 Athena 查询的数量,而不是总执行时间。

您是否考虑过使用 Apache Airflow for orchestrating and scheduling your queries. I see airflow as an alternative to a combination of Lambda and Step Functions, but it is totally free. It is easy to setup on both local and remote machines, has reach CLI and GUI for task monitoring, abstracts away all scheduling and retrying logic. Airflow even has hooks to interact with AWS services. Hell, it even has a dedicated operator 向 Athena 发送查询,因此发送查询就像:

from airflow.models import DAG
from airflow.contrib.operators.aws_athena_operator import AWSAthenaOperator
from datetime import datetime

with DAG(dag_id='simple_athena_query',
         schedule_interval=None,
         start_date=datetime(2019, 5, 21)) as dag:

    run_query = AWSAthenaOperator(
        task_id='run_query',
        query='SELECT * FROM  UNNEST(SEQUENCE(0, 100))',
        output_location='s3://my-bucket/my-path/',
        database='my_database'
    )

我将它用于类似类型的 daily/weekly 任务(使用 CTAS 语句处理数据),这些任务超过了对多个并发查询的限制。

有很多博客文章和文档可以帮助您入门。例如:

  • Medium post:自动执行 AWS Athena 查询并使用 Airflow 在 S3 周围移动结果。
  • Airflow 安装完整指南,link 1 and link 2

您甚至可以设置 integration with Slack 在查询以成功或失败状态终止时发送通知。

然而,我面临的是只有 4-5 个查询同时实际执行,而其他所有查询都在闲置。