Amazon Redshift:`ActiveStatementsExceededException`(如何同时执行 INSERT)

Amazon Redshift: `ActiveStatementsExceededException` (how to do INSERTs concurrently)

我有一个通过 Lambda 将数据推送到 Amazon Redshift 的 Kinesis 集群。

目前我的 lambda 代码看起来像这样:

client = boto3.client('redshift-data')
for tx in txs:
    query = # prepare an INSERT query here

    resp = client.execute_statement(
        ClusterIdentifier=redshift_cluster_id,
        Database=redshift_db,
        DbUser=redshift_user,
        Sql=query
    )

问题在于,一旦我尝试扩大 kinesis(更多分片)或 lambda(从单个分片进行并发处理)- 我得到这个:

[ERROR] ActiveStatementsExceededException: An error occurred (ActiveStatementsExceededException) when calling the ExecuteStatement operation: Active statements exceeded the allowed quota (200).
Traceback (most recent call last):
  File "/opt/python/lib/python3.8/site-packages/codeguru_profiler_agent/aws_lambda/profiler_decorator.py", line 52, in profiler_decorate
    return function(event, context)
  File "/opt/python/lib/python3.8/site-packages/codeguru_profiler_agent/aws_lambda/lambda_handler.py", line 91, in call_handler
    return handler_function(event, context)
  File "/var/task/lambda_function.py", line 71, in lambda_handler
    resp = client.execute_statement(
  File "/var/runtime/botocore/client.py", line 386, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/var/runtime/botocore/client.py", line 705, in _make_api_call
    raise error_class(parsed_response, operation_name)

我从 AWS 文档中了解到,这意味着我正在尝试 运行 并行处理太多 execute_statement

我该如何解决这个问题?通过批处理记录并将它们全部插入在一起来使用 Redshift 的唯一方法是什么?

你代码中的注释让我停顿了一下——“query = # prepare an INSERT query here”。这似乎暗示您正在将 S3 数据读入 Lambda 并将该数据插入 Redshift。如果是这样,这不是一个好的模式。

首先,Redshift 希望通过 COPY(或 Spectrum 或...)而不是通过 INSERT 将数据带入集群。这将在 Redshift 中产生管理事务的问题,并造成巨大的浪费或磁盘 space / 需要 VACUUM。将数据放入 Redshift 的 INSERT 方法是一种反模式,即使是中等大小的数据也不应该这样做。

更普遍的担忧是数据移动阻抗不匹配。 Kinesis 是许多独立的数据流和生成小文件的代码。 Redshift 是一个处理大型数据段的大型数据库。以错过其设计目标的方式不匹配这些工具将使它们中的任何一个表现非常差。您需要通过将 S3 批处理到 Redshift 来满足数据要求。这意味着在单个 COPY 命令中将许多 S3 文件复制到 Redshift 中。这可以通过清单或 S3 中的“目录”结构来完成。 “从 S3 路径复制所有内容……”这个将数据复制到 Redshift 的过程可以 运行 每个时间间隔(2 或 5 或 10 分钟)。因此,您希望 Kinesis Lambdas 组织 S3 中的数据(或添加到清单),以便可以收集“批次”S3 文件以进行 COPY 执行。通过这种方式,可以一次将大量 S3 文件导入 Redshift(其首选数据大小),并且还会大大减少执行 API 调用。

现在,如果您设置了非常大的 Kinesis 管道并且数据非常大,则需要考虑另一个数据移动“偏好”。这仅在您每分钟移动大量数据时才重要。此额外偏好适用于 S3。 S3 是对象存储意味着“查找”请求的对象键会占用大量时间。大约 0.5 秒。因此,读取一千个 S3 对象将需要 500(总共)500 秒的密钥查找时间。 Redshift 将并行向 S3 发出请求,集群中的每个切片一个,因此其中一些时间是并行的。如果正在读取的文件大小为 1KB,则在 S3 查找完成后,数据的数据传输大约需要 1.25 秒。全部的。同样,这次是并行的,但您可以看到在查找和传输上花费了多少时间。为了从 S3 中获得最大带宽以读取许多文件,这些文件的大小需要为 1GB(根据我的经验,100MB 是可以的)。您可以看到,如果您每分钟要将数百万个文件从 Kinesis 摄取到 Redshift,您将需要一个过程将许多小文件组合成更大的文件,以避免 S3 的这种危险。由于您使用 Lambda 作为 Kinesis reader 我希望您还没有达到这个数据速率,但如果您希望扩展到非常大的规模,那么关注这个问题是件好事。

仅仅因为工具具有高带宽并不意味着它们可以通过管道连接在一起。带宽有多种形式。