将批量 csv 数据上传到现有的 DynamoDB table
Upload bulk csv data into existing DynamoDB table
我正在尝试将数据从 csv
文件迁移到现有的 AWS DynamoDB
table,作为 AWS Amplify
网络应用程序的一部分。
我使用以下模板关注 this CloudFormation tutorial。
我只能创建一个新的 DynamoDB table,但不能使用现有的 table 并向其中添加数据。
问题:
有没有办法修改模板,以便我可以在向导的“DynamoDBTableName”下的“指定堆栈详细信息”步骤中提供现有的 table 名称,以便将 csv 数据添加到 table?如果没有,是否有替代流程?
{
"AWSTemplateFormatVersion": "2010-09-09",
"Metadata": {
},
"Parameters" : {
"BucketName": {
"Description": "Name of the S3 bucket you will deploy the CSV file to",
"Type": "String",
"ConstraintDescription": "must be a valid bucket name."
},
"FileName": {
"Description": "Name of the S3 file (including suffix)",
"Type": "String",
"ConstraintDescription": "Valid S3 file name."
},
"DynamoDBTableName": {
"Description": "Name of the dynamoDB table you will use",
"Type": "String",
"ConstraintDescription": "must be a valid dynamoDB name."
}
},
"Resources": {
"DynamoDBTable":{
"Type": "AWS::DynamoDB::Table",
"Properties":{
"TableName": {"Ref" : "DynamoDBTableName"},
"BillingMode": "PAY_PER_REQUEST",
"AttributeDefinitions":[
{
"AttributeName": "id",
"AttributeType": "S"
}
],
"KeySchema":[
{
"AttributeName": "id",
"KeyType": "HASH"
}
],
"Tags":[
{
"Key": "Name",
"Value": {"Ref" : "DynamoDBTableName"}
}
]
}
},
"LambdaRole" : {
"Type" : "AWS::IAM::Role",
"Properties" : {
"AssumeRolePolicyDocument": {
"Version" : "2012-10-17",
"Statement" : [
{
"Effect" : "Allow",
"Principal" : {
"Service" : ["lambda.amazonaws.com","s3.amazonaws.com"]
},
"Action" : [
"sts:AssumeRole"
]
}
]
},
"Path" : "/",
"ManagedPolicyArns":["arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole","arn:aws:iam::aws:policy/AWSLambdaInvocation-DynamoDB","arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"],
"Policies": [{
"PolicyName": "policyname",
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Resource": "*",
"Action": [
"dynamodb:PutItem",
"dynamodb:BatchWriteItem"
]
}]
}
}]
}
},
"CsvToDDBLambdaFunction": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Handler": "index.lambda_handler",
"Role": {
"Fn::GetAtt": [
"LambdaRole",
"Arn"
]
},
"Code": {
"ZipFile": {
"Fn::Join": [
"\n",
[
"import json",
"import boto3",
"import os",
"import csv",
"import codecs",
"import sys",
"",
"s3 = boto3.resource('s3')",
"dynamodb = boto3.resource('dynamodb')",
"",
"bucket = os.environ['bucket']",
"key = os.environ['key']",
"tableName = os.environ['table']",
"",
"def lambda_handler(event, context):",
"",
"",
" #get() does not store in memory",
" try:",
" obj = s3.Object(bucket, key).get()['Body']",
" except:",
" print(\"S3 Object could not be opened. Check environment variable. \")",
" try:",
" table = dynamodb.Table(tableName)",
" except:",
" print(\"Error loading DynamoDB table. Check if table was created correctly and environment variable.\")",
"",
" batch_size = 100",
" batch = []",
"",
" #DictReader is a generator; not stored in memory",
" for row in csv.DictReader(codecs.getreader('utf-8-sig')(obj)):",
" if len(batch) >= batch_size:",
" write_to_dynamo(batch)",
" batch.clear()",
"",
" batch.append(row)",
"",
" if batch:",
" write_to_dynamo(batch)",
"",
" return {",
" 'statusCode': 200,",
" 'body': json.dumps('Uploaded to DynamoDB Table')",
" }",
"",
"",
"def write_to_dynamo(rows):",
" try:",
" table = dynamodb.Table(tableName)",
" except:",
" print(\"Error loading DynamoDB table. Check if table was created correctly and environment variable.\")",
"",
" try:",
" with table.batch_writer() as batch:",
" for i in range(len(rows)):",
" batch.put_item(",
" Item=rows[i]",
" )",
" except:",
" print(\"Error executing batch_writer\")"
]
]
}
},
"Runtime": "python3.7",
"Timeout": 900,
"MemorySize": 3008,
"Environment" : {
"Variables" : {"bucket" : { "Ref" : "BucketName" }, "key" : { "Ref" : "FileName" },"table" : { "Ref" : "DynamoDBTableName" }}
}
}
},
"S3Bucket": {
"DependsOn" : ["CsvToDDBLambdaFunction","BucketPermission"],
"Type": "AWS::S3::Bucket",
"Properties": {
"BucketName": {"Ref" : "BucketName"},
"AccessControl": "BucketOwnerFullControl",
"NotificationConfiguration":{
"LambdaConfigurations":[
{
"Event":"s3:ObjectCreated:*",
"Function":{
"Fn::GetAtt": [
"CsvToDDBLambdaFunction",
"Arn"
]
}
}
]
}
}
},
"BucketPermission":{
"Type": "AWS::Lambda::Permission",
"Properties":{
"Action": "lambda:InvokeFunction",
"FunctionName":{"Ref" : "CsvToDDBLambdaFunction"},
"Principal": "s3.amazonaws.com",
"SourceAccount": {"Ref":"AWS::AccountId"}
}
}
},
"Outputs" : {
}
}
另一个答案
丹尼斯的回答是一种解决方案,但您也可以在 JSON 文件的 "Resources"
中注释掉 "DynamoDBTable"
部分。
您可以使用 AWS Database Migration Service (DMS). Have a look at this step-by step walkthrough.
将 CSV 文件从 Amazon S3 迁移到 Amazon DynamoDB
我正在尝试将数据从 csv
文件迁移到现有的 AWS DynamoDB
table,作为 AWS Amplify
网络应用程序的一部分。
我使用以下模板关注 this CloudFormation tutorial。
我只能创建一个新的 DynamoDB table,但不能使用现有的 table 并向其中添加数据。
问题: 有没有办法修改模板,以便我可以在向导的“DynamoDBTableName”下的“指定堆栈详细信息”步骤中提供现有的 table 名称,以便将 csv 数据添加到 table?如果没有,是否有替代流程?
{
"AWSTemplateFormatVersion": "2010-09-09",
"Metadata": {
},
"Parameters" : {
"BucketName": {
"Description": "Name of the S3 bucket you will deploy the CSV file to",
"Type": "String",
"ConstraintDescription": "must be a valid bucket name."
},
"FileName": {
"Description": "Name of the S3 file (including suffix)",
"Type": "String",
"ConstraintDescription": "Valid S3 file name."
},
"DynamoDBTableName": {
"Description": "Name of the dynamoDB table you will use",
"Type": "String",
"ConstraintDescription": "must be a valid dynamoDB name."
}
},
"Resources": {
"DynamoDBTable":{
"Type": "AWS::DynamoDB::Table",
"Properties":{
"TableName": {"Ref" : "DynamoDBTableName"},
"BillingMode": "PAY_PER_REQUEST",
"AttributeDefinitions":[
{
"AttributeName": "id",
"AttributeType": "S"
}
],
"KeySchema":[
{
"AttributeName": "id",
"KeyType": "HASH"
}
],
"Tags":[
{
"Key": "Name",
"Value": {"Ref" : "DynamoDBTableName"}
}
]
}
},
"LambdaRole" : {
"Type" : "AWS::IAM::Role",
"Properties" : {
"AssumeRolePolicyDocument": {
"Version" : "2012-10-17",
"Statement" : [
{
"Effect" : "Allow",
"Principal" : {
"Service" : ["lambda.amazonaws.com","s3.amazonaws.com"]
},
"Action" : [
"sts:AssumeRole"
]
}
]
},
"Path" : "/",
"ManagedPolicyArns":["arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole","arn:aws:iam::aws:policy/AWSLambdaInvocation-DynamoDB","arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"],
"Policies": [{
"PolicyName": "policyname",
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Resource": "*",
"Action": [
"dynamodb:PutItem",
"dynamodb:BatchWriteItem"
]
}]
}
}]
}
},
"CsvToDDBLambdaFunction": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Handler": "index.lambda_handler",
"Role": {
"Fn::GetAtt": [
"LambdaRole",
"Arn"
]
},
"Code": {
"ZipFile": {
"Fn::Join": [
"\n",
[
"import json",
"import boto3",
"import os",
"import csv",
"import codecs",
"import sys",
"",
"s3 = boto3.resource('s3')",
"dynamodb = boto3.resource('dynamodb')",
"",
"bucket = os.environ['bucket']",
"key = os.environ['key']",
"tableName = os.environ['table']",
"",
"def lambda_handler(event, context):",
"",
"",
" #get() does not store in memory",
" try:",
" obj = s3.Object(bucket, key).get()['Body']",
" except:",
" print(\"S3 Object could not be opened. Check environment variable. \")",
" try:",
" table = dynamodb.Table(tableName)",
" except:",
" print(\"Error loading DynamoDB table. Check if table was created correctly and environment variable.\")",
"",
" batch_size = 100",
" batch = []",
"",
" #DictReader is a generator; not stored in memory",
" for row in csv.DictReader(codecs.getreader('utf-8-sig')(obj)):",
" if len(batch) >= batch_size:",
" write_to_dynamo(batch)",
" batch.clear()",
"",
" batch.append(row)",
"",
" if batch:",
" write_to_dynamo(batch)",
"",
" return {",
" 'statusCode': 200,",
" 'body': json.dumps('Uploaded to DynamoDB Table')",
" }",
"",
"",
"def write_to_dynamo(rows):",
" try:",
" table = dynamodb.Table(tableName)",
" except:",
" print(\"Error loading DynamoDB table. Check if table was created correctly and environment variable.\")",
"",
" try:",
" with table.batch_writer() as batch:",
" for i in range(len(rows)):",
" batch.put_item(",
" Item=rows[i]",
" )",
" except:",
" print(\"Error executing batch_writer\")"
]
]
}
},
"Runtime": "python3.7",
"Timeout": 900,
"MemorySize": 3008,
"Environment" : {
"Variables" : {"bucket" : { "Ref" : "BucketName" }, "key" : { "Ref" : "FileName" },"table" : { "Ref" : "DynamoDBTableName" }}
}
}
},
"S3Bucket": {
"DependsOn" : ["CsvToDDBLambdaFunction","BucketPermission"],
"Type": "AWS::S3::Bucket",
"Properties": {
"BucketName": {"Ref" : "BucketName"},
"AccessControl": "BucketOwnerFullControl",
"NotificationConfiguration":{
"LambdaConfigurations":[
{
"Event":"s3:ObjectCreated:*",
"Function":{
"Fn::GetAtt": [
"CsvToDDBLambdaFunction",
"Arn"
]
}
}
]
}
}
},
"BucketPermission":{
"Type": "AWS::Lambda::Permission",
"Properties":{
"Action": "lambda:InvokeFunction",
"FunctionName":{"Ref" : "CsvToDDBLambdaFunction"},
"Principal": "s3.amazonaws.com",
"SourceAccount": {"Ref":"AWS::AccountId"}
}
}
},
"Outputs" : {
}
}
另一个答案
丹尼斯的回答是一种解决方案,但您也可以在 JSON 文件的 "Resources"
中注释掉 "DynamoDBTable"
部分。
您可以使用 AWS Database Migration Service (DMS). Have a look at this step-by step walkthrough.
将 CSV 文件从 Amazon S3 迁移到 Amazon DynamoDB