如何使用 Data Pipeline 导出具有按需配置的 DynamoDB table

How to use Data Pipeline to export a DynamoDB table that has on-demand provision

我曾经使用名为 Export DynamoDB table to S3 的数据管道模板将 DynamoDB table 导出到文件。我最近更新了我所有的 DynamoDB tables 以提供按需供应,模板不再有效。我很确定这是因为旧模板指定了要消耗的 DynamoDB 吞吐量的百分比,这与按需 tables.

无关

我尝试将旧模板导出到 JSON,删除对吞吐量百分比消耗的引用,并创建一个新管道。然而,这并没有成功。

谁能建议如何将具有吞吐量规定的旧式管道脚本转换为新的按需 table 脚本?

这是我的原始功能脚本:

{
  "objects": [
    {
      "name": "DDBSourceTable",
      "id": "DDBSourceTable",
      "type": "DynamoDBDataNode",
      "tableName": "#{myDDBTableName}"
    },
    {
      "name": "EmrClusterForBackup",
      "coreInstanceCount": "1",
      "coreInstanceType": "m3.xlarge",
      "releaseLabel": "emr-5.13.0",
      "masterInstanceType": "m3.xlarge",
      "id": "EmrClusterForBackup",
      "region": "#{myDDBRegion}",
      "type": "EmrCluster"
    },
    {
      "failureAndRerunMode": "CASCADE",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "role": "DataPipelineDefaultRole",
      "scheduleType": "ONDEMAND",
      "name": "Default",
      "id": "Default"
    },
    {
      "output": {
        "ref": "S3BackupLocation"
      },
      "input": {
        "ref": "DDBSourceTable"
      },
      "maximumRetries": "2",
      "name": "TableBackupActivity",
      "step": "s3://dynamodb-emr-#{myDDBRegion}/emr-ddb-storage-handler/2.1.0/emr-ddb-2.1.0.jar,org.apache.hadoop.dynamodb.tools.DynamoDbExport,#{output.directoryPath},#{input.tableName},#{input.readThroughputPercent}",
      "id": "TableBackupActivity",
      "runsOn": {
        "ref": "EmrClusterForBackup"
      },
      "type": "EmrActivity",
      "resizeClusterBeforeRunning": "true"
    },
    {
      "directoryPath": "#{myOutputS3Loc}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')}",
      "name": "S3BackupLocation",
      "id": "S3BackupLocation",
      "type": "S3DataNode"
    }
  ],
  "parameters": [
    {
      "description": "Output S3 folder",
      "id": "myOutputS3Loc",
      "type": "AWS::S3::ObjectKey"
    },
    {
      "description": "Source DynamoDB table name",
      "id": "myDDBTableName",
      "type": "String"
    },
    {
      "default": "0.25",
      "watermark": "Enter value between 0.1-1.0",
      "description": "DynamoDB read throughput ratio",
      "id": "myDDBReadThroughputRatio",
      "type": "Double"
    },
    {
      "default": "us-east-1",
      "watermark": "us-east-1",
      "description": "Region of the DynamoDB table",
      "id": "myDDBRegion",
      "type": "String"
    }
  ],
  "values": {
    "myDDBRegion": "us-east-1",
    "myDDBTableName": "LIVE_Invoices",
    "myDDBReadThroughputRatio": "0.25",
    "myOutputS3Loc": "s3://company-live-extracts/"
  }
}

这是我尝试更新但失败了:

{
  "objects": [
    {
      "name": "DDBSourceTable",
      "id": "DDBSourceTable",
      "type": "DynamoDBDataNode",
      "tableName": "#{myDDBTableName}"
    },
    {
      "name": "EmrClusterForBackup",
      "coreInstanceCount": "1",
      "coreInstanceType": "m3.xlarge",
      "releaseLabel": "emr-5.13.0",
      "masterInstanceType": "m3.xlarge",
      "id": "EmrClusterForBackup",
      "region": "#{myDDBRegion}",
      "type": "EmrCluster"
    },
    {
      "failureAndRerunMode": "CASCADE",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "role": "DataPipelineDefaultRole",
      "scheduleType": "ONDEMAND",
      "name": "Default",
      "id": "Default"
    },
    {
      "output": {
        "ref": "S3BackupLocation"
      },
      "input": {
        "ref": "DDBSourceTable"
      },
      "maximumRetries": "2",
      "name": "TableBackupActivity",
      "step": "s3://dynamodb-emr-#{myDDBRegion}/emr-ddb-storage-handler/2.1.0/emr-ddb-2.1.0.jar,org.apache.hadoop.dynamodb.tools.DynamoDbExport,#{output.directoryPath},#{input.tableName}",
      "id": "TableBackupActivity",
      "runsOn": {
        "ref": "EmrClusterForBackup"
      },
      "type": "EmrActivity",
      "resizeClusterBeforeRunning": "true"
    },
    {
      "directoryPath": "#{myOutputS3Loc}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')}",
      "name": "S3BackupLocation",
      "id": "S3BackupLocation",
      "type": "S3DataNode"
    }
  ],
  "parameters": [
    {
      "description": "Output S3 folder",
      "id": "myOutputS3Loc",
      "type": "AWS::S3::ObjectKey"
    },
    {
      "description": "Source DynamoDB table name",
      "id": "myDDBTableName",
      "type": "String"
    },
    {
      "default": "us-east-1",
      "watermark": "us-east-1",
      "description": "Region of the DynamoDB table",
      "id": "myDDBRegion",
      "type": "String"
    }
  ],
  "values": {
    "myDDBRegion": "us-east-1",
    "myDDBTableName": "LIVE_Invoices",
    "myOutputS3Loc": "s3://company-live-extracts/"
  }
}

这里是数据管道执行的错误:

at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:322) at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:198) at org.apache.hadoop.mapreduce.Job.run(Job.java:1341) at org.apache.hadoop.mapreduce.Job.run(Job.java:1338) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.hadoop.mapreduce.Job.submit(Job.java:1338) at org.apache.hadoop.mapred.JobClient.run(JobClient.java:575) at org.apache.hadoop.mapred.JobClient.run(JobClient.java:570) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:570) at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java

我为此向 AWS 开了一个支持票。他们的回应非常全面。我会贴在下面


感谢您就此问题联系我们。

遗憾的是,DynamoDB 的数据管道导出/导入作业不支持 DynamoDB 的新按需模式 [1]。

使用按需容量的表没有定义读取和写入单元的容量。 Data Pipeline 在计算管道的吞吐量时依赖于此定义的容量。

例如,如果您有 100 个 RCU(读取容量单位)和 0.25 (25%) 的管道吞吐量,则有​​效管道吞吐量将为每秒 25 个读取单元 (100 * 0.25)。 但是在On-Demand capacity的情况下,RCU和WCU(Write Capacity Units)都体现为0。无论流水线吞吐量值如何,计算出的有效吞吐量都是0。

当有效吞吐量小于1时,管道将不会执行。

是否需要将 DynamoDB table 导出到 S3?

如果您仅将这些 table 导出用于备份目的,我建议使用 DynamoDB 的按需备份和还原功能(与按需容量相似的名称,容易混淆)[2]。

请注意,按需备份不会影响 table 的吞吐量,并且会在几秒钟内完成。您只需支付与备份相关的 S3 存储成本。 但是,客户无法直接访问这些 table 备份,只能恢复到源 table。如果您希望对备份数据进行分析,或将数据导入其他系统、帐户或 tables,则此备份方法不适合table。

如果您需要使用 Data Pipeline 导出 DynamoDB 数据,那么唯一的方法是将 table(s) 设置为预配置容量模式。

您可以手动执行此操作,或者使用 AWS CLI 命令 [3].

将其作为 activity 包含在管道本身中

例如(On-Demand也称为Pay Per Request模式):

$ aws dynamodb update-table --table-name myTable --billing-mode PROVISIONED --provisioned-throughput ReadCapacityUnits=100,WriteCapacityUnits=100

-

$ aws dynamodb update-table --table-name myTable --billing-mode PAY_PER_REQUEST

请注意,禁用按需容量模式后,需要等待24小时才能再次启用。

===参考链接===

[1] DynamoDB 按需容量(另请参阅关于不受支持的服务/工具的说明):https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html#HowItWorks.OnDemand

[2] DynamoDB 按需备份和还原:https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/BackupRestore.html

[3] DynamoDB "update-table" 的 AWS CLI 参考:https://docs.aws.amazon.com/cli/latest/reference/dynamodb/update-table.html

今年早些时候向 DDB 导出工具添加了对按需表的支持:GitHub commit

我能够在 S3 上放置更新版本的工具并更新管道中的一些内容以使其正常工作:

{
  "objects": [
    {
      "output": {
        "ref": "S3BackupLocation"
      },
      "input": {
        "ref": "DDBSourceTable"
      },
      "maximumRetries": "2",
      "name": "TableBackupActivity",
      "step": "s3://<your-tools-bucket>/emr-dynamodb-tools-4.11.0-SNAPSHOT.jar,org.apache.hadoop.dynamodb.tools.DynamoDBExport,#{output.directoryPath},#{input.tableName},#{input.readThroughputPercent}",
      "id": "TableBackupActivity",
      "runsOn": {
        "ref": "EmrClusterForBackup"
      },
      "type": "EmrActivity",
      "resizeClusterBeforeRunning": "true"
    },
    {
      "failureAndRerunMode": "CASCADE",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "role": "DataPipelineDefaultRole",
      "pipelineLogUri": "s3://<your-log-bucket>/",
      "scheduleType": "ONDEMAND",
      "name": "Default",
      "id": "Default"
    },
    {
      "readThroughputPercent": "#{myDDBReadThroughputRatio}",
      "name": "DDBSourceTable",
      "id": "DDBSourceTable",
      "type": "DynamoDBDataNode",
      "tableName": "#{myDDBTableName}"
    },
    {
      "directoryPath": "#{myOutputS3Loc}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')}",
      "name": "S3BackupLocation",
      "id": "S3BackupLocation",
      "type": "S3DataNode"
    },
    {
      "name": "EmrClusterForBackup",
      "coreInstanceCount": "1",
      "coreInstanceType": "m3.xlarge",
      "releaseLabel": "emr-5.26.0",
      "masterInstanceType": "m3.xlarge",
      "id": "EmrClusterForBackup",
      "region": "#{myDDBRegion}",
      "type": "EmrCluster",
      "terminateAfter": "1 Hour"
    }
  ],
  "parameters": [
    {
      "description": "Output S3 folder",
      "id": "myOutputS3Loc",
      "type": "AWS::S3::ObjectKey"
    },
    {
      "description": "Source DynamoDB table name",
      "id": "myDDBTableName",
      "type": "String"
    },
    {
      "default": "0.25",
      "watermark": "Enter value between 0.1-1.0",
      "description": "DynamoDB read throughput ratio",
      "id": "myDDBReadThroughputRatio",
      "type": "Double"
    },
    {
      "default": "us-east-1",
      "watermark": "us-east-1",
      "description": "Region of the DynamoDB table",
      "id": "myDDBRegion",
      "type": "String"
    }
  ],
  "values": {
    "myDDBRegion": "us-west-2",
    "myDDBTableName": "<your table name>",
    "myDDBReadThroughputRatio": "0.5",
    "myOutputS3Loc": "s3://<your-output-bucket>/"
  }
}

主要变化:

  • EmrClusterForBackup 的 releaseLabel 更新为 "emr-5.26.0"。这是获取适用于 Java 的 AWS SDK v1.11 和 DynamoDB 连接器 v4.11.0 所必需的(请参阅此处的发布矩阵:AWS docs
  • 如上更新TableBackupActivity的步骤。将它指向您构建的 *.jar,并将工具的 class 名称从 DynamoDbExport 更新为 DynamoDBExport

希望默认模板也得到更新,这样它就可以开箱即用。