如何使用 Data Pipeline 将 DynamoDB table 的数据复制到另一个 DynamoDB table 当两者都具有按需容量时

How to use Data Pipeline to copy data of a DynamoDB table to another DynamoDB table when both have on-demand capacity

我曾经使用 pipeline.json 将数据从一个 DynamoDB 复制到另一个 DynamoDB。它在源 table 已配置容量时有效,如果目标设置为 provisioned/on 需求则无关紧要。我希望 我的两个 table 都设置为按需容量。但是当我使用相同的模板时它不起作用。有什么方法可以做到这一点,还是仍在开发中?

这是我原来的功能脚本:

{
    "objects": [
        {
            "startAt": "FIRST_ACTIVATION_DATE_TIME",
            "name": "DailySchedule",
            "id": "DailySchedule",
            "period": "1 day",
            "type": "Schedule",
            "occurrences": "1"
        },
        {
            "id": "Default",
            "name": "Default",
            "scheduleType": "ONDEMAND",
            "pipelineLogUri": "#{myS3LogsPath}",
            "schedule": {
                "ref": "DailySchedule"
            },
            "failureAndRerunMode": "CASCADE",
            "role": "DataPipelineDefaultRole",
            "resourceRole": "DataPipelineDefaultResourceRole"
        },
        {
            "id": "DDBSourceTable",
            "tableName": "#{myDDBSourceTableName}",
            "name": "DDBSourceTable",
            "type": "DynamoDBDataNode",
            "readThroughputPercent": "#{myDDBReadThroughputRatio}"
        },
        {
            "name": "S3TempLocation",
            "id": "S3TempLocation",
            "type": "S3DataNode",
            "directoryPath": "#{myTempS3Folder}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')}"
        },
        {
            "id": "DDBDestinationTable",
            "tableName": "#{myDDBDestinationTableName}",
            "name": "DDBDestinationTable",
            "type": "DynamoDBDataNode",
            "writeThroughputPercent": "#{myDDBWriteThroughputRatio}"
        },
        {
            "id": "EmrClusterForBackup",
            "name": "EmrClusterForBackup",
            "amiVersion": "3.8.0",
            "masterInstanceType": "m3.xlarge",
            "coreInstanceType": "m3.xlarge",
            "coreInstanceCount": "1",
            "region": "#{myDDBSourceRegion}",
            "terminateAfter": "10 Days",
            "type": "EmrCluster"
        },
        {
            "id": "EmrClusterForLoad",
            "name": "EmrClusterForLoad",
            "amiVersion": "3.8.0",
            "masterInstanceType": "m3.xlarge",
            "coreInstanceType": "m3.xlarge",
            "coreInstanceCount": "1",
            "region": "#{myDDBDestinationRegion}",
            "terminateAfter": "10 Days",
            "type": "EmrCluster"
        },
        {
            "id": "TableLoadActivity",
            "name": "TableLoadActivity",
            "runsOn": {
                "ref": "EmrClusterForLoad"
            },
            "input": {
                "ref": "S3TempLocation"
            },
            "output": {
                "ref": "DDBDestinationTable"
            },
            "type": "EmrActivity",
            "maximumRetries": "2",
            "dependsOn": {
               "ref": "TableBackupActivity"
            },
            "resizeClusterBeforeRunning": "true",
            "step": [
                "s3://dynamodb-emr-#{myDDBDestinationRegion}/emr-ddb-storage-handler/2.1.0/emr-ddb-2.1.0.jar,org.apache.hadoop.dynamodb.tools.DynamoDbImport,#{input.directoryPath},#{output.tableName},#{output.writeThroughputPercent}"
            ]
        },
        {
            "id": "TableBackupActivity",
            "name": "TableBackupActivity",
            "input": {
                "ref": "DDBSourceTable"
            },
            "output": {
                "ref": "S3TempLocation"
            },
            "runsOn": {
                "ref": "EmrClusterForBackup"
            },
            "resizeClusterBeforeRunning": "true",
            "type": "EmrActivity",
            "maximumRetries": "2",
            "step": [
                "s3://dynamodb-emr-#{myDDBSourceRegion}/emr-ddb-storage-handler/2.1.0/emr-ddb-2.1.0.jar,org.apache.hadoop.dynamodb.tools.DynamoDbExport,#{output.directoryPath},#{input.tableName},#{input.readThroughputPercent}"
            ]
        },
        {
            "dependsOn": {
                "ref": "TableLoadActivity"
            },
            "name": "S3CleanupActivity",
            "id": "S3CleanupActivity",
            "input": {
                "ref": "S3TempLocation"
            },
            "runsOn": {
               "ref": "EmrClusterForBackup"
            },
            "type": "ShellCommandActivity",
            "command": "(sudo yum -y update aws-cli) && (aws s3 rm #{input.directoryPath} --recursive)"
        }
    ],
    "parameters": [
        {
            "myComment": "This Parameter specifies the S3 logging path for the pipeline.  It is used by the 'Default' object to set the 'pipelineLogUri' value.",
            "id" : "myS3LogsPath",
            "type" : "AWS::S3::ObjectKey",
            "description" : "S3 path for pipeline logs."
        },
        {
            "id": "myDDBSourceTableName",
            "type": "String",
            "description": "Source DynamoDB table name"
        },
        {
            "id": "myDDBDestinationTableName",
            "type": "String",
            "description": "Target DynamoDB table name"
        },
        {
            "id": "myDDBWriteThroughputRatio",
            "type": "Double",
            "description": "DynamoDB write throughput ratio",
            "default": "1",
            "watermark": "Enter value between 0.1-1.0"
        },
        {
            "id": "myDDBSourceRegion",
            "type": "String",
            "description": "Region of the DynamoDB table",
            "default": "us-west-2"
        },
        {
            "id": "myDDBDestinationRegion",
            "type": "String",
            "description": "Region of the DynamoDB table",
            "default": "us-west-2"
        },
        {
            "id": "myDDBReadThroughputRatio",
            "type": "Double",
            "description": "DynamoDB read throughput ratio",
            "default": "1",
            "watermark": "Enter value between 0.1-1.0"
        },
        {
            "myComment": "Temporary S3 path to store the dynamodb backup csv files, backup files will be deleted after the copy completes",
            "id": "myTempS3Folder",
            "type": "AWS::S3::ObjectKey",
            "description": "Temporary S3 folder"
        }
    ]
}

这是当源 DynamoDB table 设置为 On Demand 容量时数据管道执行的错误消息:

at org.apache.hadoop.mapreduce.JobSubmitter.writeOldSplits(JobSubmitter.java:520)
    at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:512)
    at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:394)
    at org.apache.hadoop.mapreduce.Job.run(Job.java:1285)
    at org.apache.hadoop.mapreduce.Job.run(Job.java:1282)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282)
    at org.apache.hadoop.mapred.JobClient.run(JobClient.java:562)
    at org.apache.hadoop.mapred.JobClient.run(JobClient.java:557)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
    at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:557)
    at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:548)
    at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:833)
    at org.apache.hadoop.dynamodb.tools.DynamoDbExport.run(DynamoDbExport.java:79)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    at org.apache.hadoop.dynamodb.tools.DynamoDbExport.main(DynamoDbExport.java:30)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:212)

以下 JSON 文件适用于上传(DynamoDB 到 S3)-

{
    "objects": [
        {
            "id": "Default",
            "name": "Default",
            "scheduleType": "ONDEMAND",
            "pipelineLogUri": "#{myS3LogsPath}",
            "failureAndRerunMode": "CASCADE",
            "role": "DataPipelineDefaultRole",
            "resourceRole": "DataPipelineDefaultResourceRole"
        },
        {
            "id": "DDBSourceTable",
            "tableName": "#{myDDBSourceTableName}",
            "name": "DDBSourceTable",
            "type": "DynamoDBDataNode",
            "readThroughputPercent": "#{myDDBReadThroughputRatio}"
        },
        {
            "name": "S3TempLocation",
            "id": "S3TempLocation",
            "type": "S3DataNode",
            "directoryPath": "#{myTempS3Folder}/data"
        },
        {
            "subnetId": "subnet-id",
            "id": "EmrClusterForBackup",
            "name": "EmrClusterForBackup",
            "masterInstanceType": "m5.xlarge",
            "coreInstanceType": "m5.xlarge",
            "coreInstanceCount": "1",
            "releaseLabel": "emr-5.23.0",
            "region": "#{myDDBSourceRegion}",
            "terminateAfter": "10 Days",
            "type": "EmrCluster"
        },
        {
            "id": "TableBackupActivity",
            "name": "TableBackupActivity",
            "input": {
                "ref": "DDBSourceTable"
            },
            "output": {
                "ref": "S3TempLocation"
            },
            "runsOn": {
                "ref": "EmrClusterForBackup"
            },
            "resizeClusterBeforeRunning": "true",
            "type": "EmrActivity",
            "maximumRetries": "2",
            "step": [
                "s3://dynamodb-dpl-#{myDDBSourceRegion}/emr-ddb-storage-handler/4.11.0/emr-dynamodb-tools-4.11.0-SNAPSHOT-jar-with-dependencies.jar,org.apache.hadoop.dynamodb.tools.DynamoDBExport,#{output.directoryPath},#{input.tableName},#{input.readThroughputPercent}"
            ]
        }
    ],
    "parameters": [
        {
            "myComment": "This Parameter specifies the S3 logging path for the pipeline.  It is used by the 'Default' object to set the 'pipelineLogUri' value.",
            "id" : "myS3LogsPath",
            "type" : "AWS::S3::ObjectKey",
            "description" : "S3 path for pipeline logs."
        },
        {
            "id": "myDDBSourceTableName",
            "type": "String",
            "description": "Source DynamoDB table name"
        },
        {
            "id": "myDDBSourceRegion",
            "type": "String",
            "description": "Region of the DynamoDB table",
            "default": "us-west-2"
        },
        {
            "id": "myDDBReadThroughputRatio",
            "type": "Double",
            "description": "DynamoDB read throughput ratio",
            "default": "1",
            "watermark": "Enter value between 0.1-1.0"
        },
        {
            "myComment": "Temporary S3 path to store the dynamodb backup csv files, backup files will be deleted after the copy completes",
            "id": "myTempS3Folder",
            "type": "AWS::S3::ObjectKey",
            "description": "Temporary S3 folder"
        }
    ]
}

并且以下内容适用于下载(S3 到 DynamoDB)-

{
    "objects": [
        {
            "id": "Default",
            "name": "Default",
            "scheduleType": "ONDEMAND",
            "pipelineLogUri": "#{myS3LogsPath}",
            "failureAndRerunMode": "CASCADE",
            "role": "DataPipelineDefaultRole",
            "resourceRole": "DataPipelineDefaultResourceRole"
        },
        {
            "name": "S3TempLocation",
            "id": "S3TempLocation",
            "type": "S3DataNode",
            "directoryPath": "#{myTempS3Folder}/data"
        },
        {
            "id": "DDBDestinationTable",
            "tableName": "#{myDDBDestinationTableName}",
            "name": "DDBDestinationTable",
            "type": "DynamoDBDataNode",
            "writeThroughputPercent": "#{myDDBWriteThroughputRatio}"
        },
        {
            "subnetId": "subnet-id",
            "id": "EmrClusterForLoad",
            "name": "EmrClusterForLoad",
            "releaseLabel": "emr-5.23.0",
            "masterInstanceType": "m5.xlarge",
            "coreInstanceType": "m5.xlarge",
            "coreInstanceCount": "1",
            "region": "#{myDDBDestinationRegion}",
            "terminateAfter": "10 Days",
            "type": "EmrCluster"
        },
        {
            "id": "TableLoadActivity",
            "name": "TableLoadActivity",
            "runsOn": {
                "ref": "EmrClusterForLoad"
            },
            "input": {
                "ref": "S3TempLocation"
            },
            "output": {
                "ref": "DDBDestinationTable"
            },
            "type": "EmrActivity",
            "maximumRetries": "2",
            "resizeClusterBeforeRunning": "true",
            "step": [
                "s3://dynamodb-dpl-#{myDDBDestinationRegion}/emr-ddb-storage-handler/4.11.0/emr-dynamodb-tools-4.11.0-SNAPSHOT-jar-with-dependencies.jar,org.apache.hadoop.dynamodb.tools.DynamoDBImport,#{input.directoryPath},#{output.tableName},#{output.writeThroughputPercent}"
            ]
        },
        {
            "dependsOn": {
                "ref": "TableLoadActivity"
            },
            "name": "S3CleanupActivity",
            "id": "S3CleanupActivity",
            "input": {
                "ref": "S3TempLocation"
            },
            "runsOn": {
               "ref": "EmrClusterForLoad"
            },
            "type": "ShellCommandActivity",
            "command": "(sudo yum -y update aws-cli) && (aws s3 rm #{input.directoryPath} --recursive)"
        }
    ],
    "parameters": [
        {
            "myComment": "This Parameter specifies the S3 logging path for the pipeline.  It is used by the 'Default' object to set the 'pipelineLogUri' value.",
            "id" : "myS3LogsPath",
            "type" : "AWS::S3::ObjectKey",
            "description" : "S3 path for pipeline logs."
        },
        {
            "id": "myDDBDestinationTableName",
            "type": "String",
            "description": "Target DynamoDB table name"
        },
        {
            "id": "myDDBWriteThroughputRatio",
            "type": "Double",
            "description": "DynamoDB write throughput ratio",
            "default": "1",
            "watermark": "Enter value between 0.1-1.0"
        },
        {
            "id": "myDDBDestinationRegion",
            "type": "String",
            "description": "Region of the DynamoDB table",
            "default": "us-west-2"
        },
        {
            "myComment": "Temporary S3 path to store the dynamodb backup csv files, backup files will be deleted after the copy completes",
            "id": "myTempS3Folder",
            "type": "AWS::S3::ObjectKey",
            "description": "Temporary S3 folder"
        }
    ]
}

此外,两个管道定义中的子网 ID 字段完全是可选的,但设置它们总是好的。