DynamoDB 如何将超过 25 items/rows 写入 Table?
How to write more than 25 items/rows into Table for DynamoDB?
我对 Amazon DynamoDB 还很陌生。我目前有 20000 行需要添加到 table。但是,根据我所读的内容,似乎我一次只能使用 BatchWriteItem class 和 25 个 WriteRequests 写入最多 25 行。有可能增加这个吗?我怎样才能一次写超过 25 行?目前写入所有 20000 行大约需要 15 分钟。谢谢。
您最多只能在单个 BatchWriteItem 请求中发送 25 个项目,但您可以一次发送任意数量的 BatchWriteItem 请求。假设您 provisioned enough write throughput,您应该能够通过将这 20k 行拆分为多个 threads/processes/hosts 并将它们并行推送到数据库来显着加快速度。
对于这么小的数据集,它可能有点重量级,但您可以使用 AWS Data Pipeline 从 S3 提取数据。它基本上自动化了创建 Hadoop 集群的过程,以从 S3 提取数据并通过一系列并行 BatchWriteItem 请求将其发送到 DynamoDB。
我正在寻找一些代码来使用 JavaScript SDK 执行此操作。我找不到,所以我自己把它放在一起。我希望这对其他人有帮助!
function multiWrite(table, data, cb) {
var AWS = require('aws-sdk');
var db = new AWS.DynamoDB.DocumentClient({region: 'us-east-1'});
// Build the batches
var batches = [];
var current_batch = [];
var item_count = 0;
for(var x in data) {
// Add the item to the current batch
item_count++;
current_batch.push({
PutRequest: {
Item: data[x]
}
});
// If we've added 25 items, add the current batch to the batches array
// and reset it
if(item_count%25 == 0) {
batches.push(current_batch);
current_batch = [];
}
}
// Add the last batch if it has records and is not equal to 25
if(current_batch.length > 0 && current_batch.length != 25) batches.push(current_batch);
// Handler for the database operations
var completed_requests = 0;
var errors = false;
function handler(request) {
return function(err, data) {
// Increment the completed requests
completed_requests++;
// Set the errors flag
errors = (errors) ? true : err;
// Log the error if we got one
if(err) {
console.error(JSON.stringify(err, null, 2));
console.error("Request that caused database error:");
console.error(JSON.stringify(request, null, 2));
}
// Make the callback if we've completed all the requests
if(completed_requests == batches.length) {
cb(errors);
}
}
}
// Make the requests
var params;
for(x in batches) {
// Items go in params.RequestItems.id array
// Format for the items is {PutRequest: {Item: ITEM_OBJECT}}
params = '{"RequestItems": {"' + table + '": []}}';
params = JSON.parse(params);
params.RequestItems[table] = batches[x];
// Perform the batchWrite operation
db.batchWrite(params, handler(params));
}
}
function putInHistory(data,cb) {
var arrayOfArray25 = _.chunk(data, 25);
async.every(arrayOfArray25, function(arrayOf25, callback) {
var params = {
RequestItems: {
[TABLES.historyTable]: []
}
};
arrayOf25.forEach(function(item){
params.RequestItems[TABLES.historyTable].push({
PutRequest: {
Item: item
}
})
});
docClient.batchWrite(params, function(err, data) {
if (err){
console.log(err);
callback(err);
} else {
console.log(data);
callback(null, true);
};
});
}, function(err, result) {
if(err){
cb(err);
} else {
if(result){
cb(null,{allWritten:true});
} else {
cb(null,{allWritten:false});
}
}
});
}
您可以使用 lodash 从数组中生成数据块,然后使用异步库的 each/every 方法对 25 个元素的块执行 batchWrite
来自@Geerek 的回答是带有 lambda 函数的解决方案:
exports.handler = (event, context, callback) => {
console.log(`EVENT: ${JSON.stringify(event)}`);
var AWS = require('aws-sdk');
AWS.config.update({ region: process.env.REGION })
var docClient = new AWS.DynamoDB.DocumentClient();
const {data, table, cb} = event
// Build the batches
var batches = [];
var current_batch = [];
var item_count = 0;
for (var i = 0; i < data.length; i++) {
// Add the item to the current batch
item_count++
current_batch.push({
PutRequest: {
Item: data[i],
},
})
// If we've added 25 items, add the current batch to the batches array
// and reset it
if (item_count % 25 === 0) {
batches.push(current_batch)
current_batch = []
}
}
// Add the last batch if it has records and is not equal to 25
if (current_batch.length > 0 && current_batch.length !== 25) {
batches.push(current_batch)
}
// Handler for the database operations
var completed_requests = 0
var errors = false
function handler (request) {
console.log('in the handler: ', request)
return function (err, data) {
// Increment the completed requests
completed_requests++;
// Set the errors flag
errors = (errors) ? true : err;
// Log the error if we got one
if(err) {
console.error(JSON.stringify(err, null, 2));
console.error("Request that caused database error:");
console.error(JSON.stringify(request, null, 2));
callback(err);
}else {
callback(null, data);
}
// Make the callback if we've completed all the requests
if(completed_requests === batches.length) {
cb(errors);
}
}
}
// Make the requests
var params;
for (var j = 0; j < batches.length; j++) {
// Items go in params.RequestItems.id array
// Format for the items is {PutRequest: {Item: ITEM_OBJECT}}
params = '{"RequestItems": {"' + table + '": []}}'
params = JSON.parse(params)
params.RequestItems[table] = batches[j]
console.log('before db.batchWrite: ', params)
// Perform the batchWrite operation
docClient.batchWrite(params, handler(params))
}
};
我写了一个 npm package 应该可以作为 batchWrite
方法的简单替代品,你只需要将 dynamoDB 实例作为第一个参数传递,事情应该可以工作:
https://www.npmjs.com/package/batch-write-all
查看项目自述文件中的示例:
// Use bellow instead of this: dynamodb.batchWrite(params).promise();
batchWriteAll(dynamodb, params).promise();
使用 aws cli 和 aws-vault,这就是我所做的。
假设您有以下包含 1000 行的文件 (data.json)
{ "PutRequest": { "Item": { "PKey": { "S": "1" }, "SKey": { "S": "A" }}}},
{ "PutRequest": { "Item": { "PKey": { "S": "2" }, "SKey": { "S": "B" }}}},
{ "PutRequest": { "Item": { "PKey": { "S": "3" }, "SKey": { "S": "C" }}}},
... to 1000
并且您需要将其拆分为每个 25 行的块文件!
我在 LinqPad 中使用以下 c# 代码生成 .sh 文件和 json 块以便能够使用 aws cli
将它们插入 dynamodb
void Main()
{
var sourcePath= @"D:\data\whereYourMainJsonFileIsLocated\";
var sourceFilePath = @"data.json";
var awsVaultProfileName = "dev";
var env = "dev";
var tableName = "dynamodb-table-name";
var lines = System.IO.File.ReadAllLines(sourcePath + sourceFilePath);
var destinationPath = Path.Combine(sourcePath, env);
var destinationChunkPath = Path.Combine(sourcePath, env, "chunks");
if (!System.IO.Directory.Exists(destinationChunkPath))
System.IO.Directory.CreateDirectory(destinationChunkPath);
System.Text.StringBuilder shString= new System.Text.StringBuilder();
for (int i = 0; i < lines.Count(); i = i+25)
{
var pagedLines = lines.Skip(i).Take(25).ToList().Distinct().ToList();
System.Text.StringBuilder sb = new System.Text.StringBuilder();
sb.AppendLine("{");
sb.AppendLine($" \"{tableName}\": [");
foreach (var element in pagedLines)
{
if (element == pagedLines.Last())
sb.AppendLine(element.Substring(0, element.Length-1));
else
sb.AppendLine(element);
}
sb.AppendLine("]");
sb.AppendLine("}");
var fileName = $"chunk{i / 25}.json";
System.IO.File.WriteAllText(Path.Combine(destinationChunkPath, fileName), sb.ToString(), Encoding.Default);
shString.AppendLine($@"aws-vault.exe exec {awsVaultProfileName} -- aws dynamodb batch-write-item --request-items file://chunks/{fileName}");
}
System.IO.File.WriteAllText(Path.Combine(destinationPath, $"{tableName}-{env}.sh"), shString.ToString(), Encoding.Default);
}
结果将是块文件,如 chunk0.json、chunk1.json 等
{
"dynamodb-table-name": [
{ "PutRequest": { "Item": { "PKey": { "S": "1" }, "SKey": { "S": "A" }}}},
{ "PutRequest": { "Item": { "PKey": { "S": "2" }, "SKey": { "S": "B" }}}},
{ "PutRequest": { "Item": { "PKey": { "S": "3" }, "SKey": { "S": "C" }}}}
]
}
和.sh文件
aws-vault.exe exec dev -- aws dynamodb batch-write-item --request-items file://chunks/chunk0.json
aws-vault.exe exec dev -- aws dynamodb batch-write-item --request-items file://chunks/chunk1.json
aws-vault.exe exec dev -- aws dynamodb batch-write-item --request-items file://chunks/chunk2.json
最后 运行 .sh 文件,所有数据都在 table!
const { dynamoClient } = require("./resources/db");
const { v4: uuid } = require("uuid");
const batchWriteLooper = async () => {
let array = [];
for (let i = 0; i < 2000; i++) {
array.push({
PutRequest: {
Item: {
personId: uuid(),
name: `Person ${i}`,
age: Math.floor(Math.random() * 100),
gender: "Male",
createdAt: new Date(),
updatedAt: new Date(),
},
},
});
}
var perChunk = 20; // items per chunk
var result = array.reduce((resultArray, item, index) => {
const chunkIndex = Math.floor(index / perChunk);
if (!resultArray[chunkIndex]) {
resultArray[chunkIndex] = []; // start a new chunk
}
resultArray[chunkIndex].push(item);
return resultArray;
}, []);
Promise.all(
result.map(async (chunk) => {
const params = {
RequestItems: {
"persons": chunk,
},
};
return await dynamoClient.batchWrite(params).promise();
})
).then(() => {
console.log("done");
});
};
batchWriteLooper();
我对 Amazon DynamoDB 还很陌生。我目前有 20000 行需要添加到 table。但是,根据我所读的内容,似乎我一次只能使用 BatchWriteItem class 和 25 个 WriteRequests 写入最多 25 行。有可能增加这个吗?我怎样才能一次写超过 25 行?目前写入所有 20000 行大约需要 15 分钟。谢谢。
您最多只能在单个 BatchWriteItem 请求中发送 25 个项目,但您可以一次发送任意数量的 BatchWriteItem 请求。假设您 provisioned enough write throughput,您应该能够通过将这 20k 行拆分为多个 threads/processes/hosts 并将它们并行推送到数据库来显着加快速度。
对于这么小的数据集,它可能有点重量级,但您可以使用 AWS Data Pipeline 从 S3 提取数据。它基本上自动化了创建 Hadoop 集群的过程,以从 S3 提取数据并通过一系列并行 BatchWriteItem 请求将其发送到 DynamoDB。
我正在寻找一些代码来使用 JavaScript SDK 执行此操作。我找不到,所以我自己把它放在一起。我希望这对其他人有帮助!
function multiWrite(table, data, cb) {
var AWS = require('aws-sdk');
var db = new AWS.DynamoDB.DocumentClient({region: 'us-east-1'});
// Build the batches
var batches = [];
var current_batch = [];
var item_count = 0;
for(var x in data) {
// Add the item to the current batch
item_count++;
current_batch.push({
PutRequest: {
Item: data[x]
}
});
// If we've added 25 items, add the current batch to the batches array
// and reset it
if(item_count%25 == 0) {
batches.push(current_batch);
current_batch = [];
}
}
// Add the last batch if it has records and is not equal to 25
if(current_batch.length > 0 && current_batch.length != 25) batches.push(current_batch);
// Handler for the database operations
var completed_requests = 0;
var errors = false;
function handler(request) {
return function(err, data) {
// Increment the completed requests
completed_requests++;
// Set the errors flag
errors = (errors) ? true : err;
// Log the error if we got one
if(err) {
console.error(JSON.stringify(err, null, 2));
console.error("Request that caused database error:");
console.error(JSON.stringify(request, null, 2));
}
// Make the callback if we've completed all the requests
if(completed_requests == batches.length) {
cb(errors);
}
}
}
// Make the requests
var params;
for(x in batches) {
// Items go in params.RequestItems.id array
// Format for the items is {PutRequest: {Item: ITEM_OBJECT}}
params = '{"RequestItems": {"' + table + '": []}}';
params = JSON.parse(params);
params.RequestItems[table] = batches[x];
// Perform the batchWrite operation
db.batchWrite(params, handler(params));
}
}
function putInHistory(data,cb) {
var arrayOfArray25 = _.chunk(data, 25);
async.every(arrayOfArray25, function(arrayOf25, callback) {
var params = {
RequestItems: {
[TABLES.historyTable]: []
}
};
arrayOf25.forEach(function(item){
params.RequestItems[TABLES.historyTable].push({
PutRequest: {
Item: item
}
})
});
docClient.batchWrite(params, function(err, data) {
if (err){
console.log(err);
callback(err);
} else {
console.log(data);
callback(null, true);
};
});
}, function(err, result) {
if(err){
cb(err);
} else {
if(result){
cb(null,{allWritten:true});
} else {
cb(null,{allWritten:false});
}
}
});
}
您可以使用 lodash 从数组中生成数据块,然后使用异步库的 each/every 方法对 25 个元素的块执行 batchWrite
来自@Geerek 的回答是带有 lambda 函数的解决方案:
exports.handler = (event, context, callback) => {
console.log(`EVENT: ${JSON.stringify(event)}`);
var AWS = require('aws-sdk');
AWS.config.update({ region: process.env.REGION })
var docClient = new AWS.DynamoDB.DocumentClient();
const {data, table, cb} = event
// Build the batches
var batches = [];
var current_batch = [];
var item_count = 0;
for (var i = 0; i < data.length; i++) {
// Add the item to the current batch
item_count++
current_batch.push({
PutRequest: {
Item: data[i],
},
})
// If we've added 25 items, add the current batch to the batches array
// and reset it
if (item_count % 25 === 0) {
batches.push(current_batch)
current_batch = []
}
}
// Add the last batch if it has records and is not equal to 25
if (current_batch.length > 0 && current_batch.length !== 25) {
batches.push(current_batch)
}
// Handler for the database operations
var completed_requests = 0
var errors = false
function handler (request) {
console.log('in the handler: ', request)
return function (err, data) {
// Increment the completed requests
completed_requests++;
// Set the errors flag
errors = (errors) ? true : err;
// Log the error if we got one
if(err) {
console.error(JSON.stringify(err, null, 2));
console.error("Request that caused database error:");
console.error(JSON.stringify(request, null, 2));
callback(err);
}else {
callback(null, data);
}
// Make the callback if we've completed all the requests
if(completed_requests === batches.length) {
cb(errors);
}
}
}
// Make the requests
var params;
for (var j = 0; j < batches.length; j++) {
// Items go in params.RequestItems.id array
// Format for the items is {PutRequest: {Item: ITEM_OBJECT}}
params = '{"RequestItems": {"' + table + '": []}}'
params = JSON.parse(params)
params.RequestItems[table] = batches[j]
console.log('before db.batchWrite: ', params)
// Perform the batchWrite operation
docClient.batchWrite(params, handler(params))
}
};
我写了一个 npm package 应该可以作为 batchWrite
方法的简单替代品,你只需要将 dynamoDB 实例作为第一个参数传递,事情应该可以工作:
https://www.npmjs.com/package/batch-write-all
查看项目自述文件中的示例:
// Use bellow instead of this: dynamodb.batchWrite(params).promise();
batchWriteAll(dynamodb, params).promise();
使用 aws cli 和 aws-vault,这就是我所做的。
假设您有以下包含 1000 行的文件 (data.json)
{ "PutRequest": { "Item": { "PKey": { "S": "1" }, "SKey": { "S": "A" }}}},
{ "PutRequest": { "Item": { "PKey": { "S": "2" }, "SKey": { "S": "B" }}}},
{ "PutRequest": { "Item": { "PKey": { "S": "3" }, "SKey": { "S": "C" }}}},
... to 1000
并且您需要将其拆分为每个 25 行的块文件!
我在 LinqPad 中使用以下 c# 代码生成 .sh 文件和 json 块以便能够使用 aws cli
将它们插入 dynamodbvoid Main()
{
var sourcePath= @"D:\data\whereYourMainJsonFileIsLocated\";
var sourceFilePath = @"data.json";
var awsVaultProfileName = "dev";
var env = "dev";
var tableName = "dynamodb-table-name";
var lines = System.IO.File.ReadAllLines(sourcePath + sourceFilePath);
var destinationPath = Path.Combine(sourcePath, env);
var destinationChunkPath = Path.Combine(sourcePath, env, "chunks");
if (!System.IO.Directory.Exists(destinationChunkPath))
System.IO.Directory.CreateDirectory(destinationChunkPath);
System.Text.StringBuilder shString= new System.Text.StringBuilder();
for (int i = 0; i < lines.Count(); i = i+25)
{
var pagedLines = lines.Skip(i).Take(25).ToList().Distinct().ToList();
System.Text.StringBuilder sb = new System.Text.StringBuilder();
sb.AppendLine("{");
sb.AppendLine($" \"{tableName}\": [");
foreach (var element in pagedLines)
{
if (element == pagedLines.Last())
sb.AppendLine(element.Substring(0, element.Length-1));
else
sb.AppendLine(element);
}
sb.AppendLine("]");
sb.AppendLine("}");
var fileName = $"chunk{i / 25}.json";
System.IO.File.WriteAllText(Path.Combine(destinationChunkPath, fileName), sb.ToString(), Encoding.Default);
shString.AppendLine($@"aws-vault.exe exec {awsVaultProfileName} -- aws dynamodb batch-write-item --request-items file://chunks/{fileName}");
}
System.IO.File.WriteAllText(Path.Combine(destinationPath, $"{tableName}-{env}.sh"), shString.ToString(), Encoding.Default);
}
结果将是块文件,如 chunk0.json、chunk1.json 等
{
"dynamodb-table-name": [
{ "PutRequest": { "Item": { "PKey": { "S": "1" }, "SKey": { "S": "A" }}}},
{ "PutRequest": { "Item": { "PKey": { "S": "2" }, "SKey": { "S": "B" }}}},
{ "PutRequest": { "Item": { "PKey": { "S": "3" }, "SKey": { "S": "C" }}}}
]
}
和.sh文件
aws-vault.exe exec dev -- aws dynamodb batch-write-item --request-items file://chunks/chunk0.json
aws-vault.exe exec dev -- aws dynamodb batch-write-item --request-items file://chunks/chunk1.json
aws-vault.exe exec dev -- aws dynamodb batch-write-item --request-items file://chunks/chunk2.json
最后 运行 .sh 文件,所有数据都在 table!
const { dynamoClient } = require("./resources/db");
const { v4: uuid } = require("uuid");
const batchWriteLooper = async () => {
let array = [];
for (let i = 0; i < 2000; i++) {
array.push({
PutRequest: {
Item: {
personId: uuid(),
name: `Person ${i}`,
age: Math.floor(Math.random() * 100),
gender: "Male",
createdAt: new Date(),
updatedAt: new Date(),
},
},
});
}
var perChunk = 20; // items per chunk
var result = array.reduce((resultArray, item, index) => {
const chunkIndex = Math.floor(index / perChunk);
if (!resultArray[chunkIndex]) {
resultArray[chunkIndex] = []; // start a new chunk
}
resultArray[chunkIndex].push(item);
return resultArray;
}, []);
Promise.all(
result.map(async (chunk) => {
const params = {
RequestItems: {
"persons": chunk,
},
};
return await dynamoClient.batchWrite(params).promise();
})
).then(() => {
console.log("done");
});
};
batchWriteLooper();