DynamoDB 如何将超过 25 items/rows 写入 Table?

How to write more than 25 items/rows into Table for DynamoDB?

我对 Amazon DynamoDB 还很陌生。我目前有 20000 行需要添加到 table。但是,根据我所读的内容,似乎我一次只能使用 BatchWriteItem class 和 25 个 WriteRequests 写入最多 25 行。有可能增加这个吗?我怎样才能一次写超过 25 行?目前写入所有 20000 行大约需要 15 分钟。谢谢。

您最多只能在单个 BatchWriteItem 请求中发送 25 个项目,但您可以一次发送任意数量的 BatchWriteItem 请求。假设您 provisioned enough write throughput,您应该能够通过将这 20k 行拆分为多个 threads/processes/hosts 并将它们并行推送到数据库来显着加快速度。

对于这么小的数据集,它可能有点重量级,但您可以使用 AWS Data Pipeline 从 S3 提取数据。它基本上自动化了创建 Hadoop 集群的过程,以从 S3 提取数据并通过一系列并行 BatchWriteItem 请求将其发送到 DynamoDB。

我正在寻找一些代码来使用 JavaScript SDK 执行此操作。我找不到,所以我自己把它放在一起。我希望这对其他人有帮助!

function multiWrite(table, data, cb) {
    var AWS = require('aws-sdk');
    var db = new AWS.DynamoDB.DocumentClient({region: 'us-east-1'});

    // Build the batches
    var batches = [];
    var current_batch = [];
    var item_count = 0;
    for(var x in data) {
        // Add the item to the current batch
        item_count++;
        current_batch.push({
            PutRequest: {
                Item: data[x]
            }
        });
        // If we've added 25 items, add the current batch to the batches array
        // and reset it
        if(item_count%25 == 0) {
            batches.push(current_batch);
            current_batch = [];
        }
    }
    // Add the last batch if it has records and is not equal to 25
    if(current_batch.length > 0 && current_batch.length != 25) batches.push(current_batch);

    // Handler for the database operations
    var completed_requests = 0;
    var errors = false;
    function handler(request) {
        return function(err, data) {
            // Increment the completed requests
            completed_requests++;

            // Set the errors flag
            errors = (errors) ? true : err;

            // Log the error if we got one
            if(err) {
                console.error(JSON.stringify(err, null, 2));
                console.error("Request that caused database error:");
                console.error(JSON.stringify(request, null, 2));
            }

            // Make the callback if we've completed all the requests
            if(completed_requests == batches.length) {
                cb(errors);
            }
        }
    }

    // Make the requests
    var params;
    for(x in batches) {
        // Items go in params.RequestItems.id array
        // Format for the items is {PutRequest: {Item: ITEM_OBJECT}}
        params = '{"RequestItems": {"' + table + '": []}}';
        params = JSON.parse(params);
        params.RequestItems[table] = batches[x];

        // Perform the batchWrite operation
        db.batchWrite(params, handler(params));
    }
}
function putInHistory(data,cb) {
  var arrayOfArray25 = _.chunk(data, 25);
  async.every(arrayOfArray25, function(arrayOf25, callback) {
   var params = {
     RequestItems: {
    [TABLES.historyTable]: []
   }
 };
 arrayOf25.forEach(function(item){
  params.RequestItems[TABLES.historyTable].push({
    PutRequest: {
      Item: item
    }
  })
 });
 docClient.batchWrite(params, function(err, data) {
   if (err){ 
     console.log(err);
     callback(err);
   } else {
     console.log(data);
     callback(null, true);
   };
 });
}, function(err, result) {
 if(err){
   cb(err);
 } else {
   if(result){
     cb(null,{allWritten:true});
   } else {
    cb(null,{allWritten:false});
   }
 }
});
}

您可以使用 lodash 从数组中生成数据块,然后使用异步库的 each/every 方法对 25 个元素的块执行 batchWrite

来自@Geerek 的回答是带有 lambda 函数的解决方案:

exports.handler = (event, context, callback) => {
  console.log(`EVENT: ${JSON.stringify(event)}`);

  var AWS = require('aws-sdk');

  AWS.config.update({ region: process.env.REGION })

  var docClient = new AWS.DynamoDB.DocumentClient();

  const {data, table, cb} = event

  // Build the batches
  var batches = [];
  var current_batch = [];
  var item_count = 0;

  for (var i = 0; i < data.length; i++) {
    // Add the item to the current batch
    item_count++
    current_batch.push({
      PutRequest: {
        Item: data[i],
      },
    })
    // If we've added 25 items, add the current batch to the batches array
    // and reset it
    if (item_count % 25 === 0) {
      batches.push(current_batch)
      current_batch = []
    }
  }

  // Add the last batch if it has records and is not equal to 25
  if (current_batch.length > 0 && current_batch.length !== 25) {
    batches.push(current_batch)
  }

  // Handler for the database operations
  var completed_requests = 0
  var errors = false

  function handler (request) {

    console.log('in the handler: ', request)

    return function (err, data) {
      // Increment the completed requests
      completed_requests++;

      // Set the errors flag
      errors = (errors) ? true : err;

      // Log the error if we got one
      if(err) {
        console.error(JSON.stringify(err, null, 2));
        console.error("Request that caused database error:");
        console.error(JSON.stringify(request, null, 2));
        callback(err);
      }else {
        callback(null, data);
      }

      // Make the callback if we've completed all the requests
      if(completed_requests === batches.length) {
        cb(errors);
      }
    }
  }

  // Make the requests
  var params;
  for (var j = 0; j < batches.length; j++) {
    // Items go in params.RequestItems.id array
    // Format for the items is {PutRequest: {Item: ITEM_OBJECT}}
    params = '{"RequestItems": {"' + table + '": []}}'
    params = JSON.parse(params)
    params.RequestItems[table] = batches[j]

    console.log('before db.batchWrite: ', params)

    // Perform the batchWrite operation
    docClient.batchWrite(params, handler(params))
  }
};

我写了一个 npm package 应该可以作为 batchWrite 方法的简单替代品,你只需要将 dynamoDB 实例作为第一个参数传递,事情应该可以工作: https://www.npmjs.com/package/batch-write-all

查看项目自述文件中的示例:

// Use bellow instead of this: dynamodb.batchWrite(params).promise();
batchWriteAll(dynamodb, params).promise();

使用 aws cli 和 aws-vault,这就是我所做的。

假设您有以下包含 1000 行的文件 (data.json)

{ "PutRequest": { "Item": { "PKey": { "S": "1" }, "SKey": { "S": "A" }}}},
{ "PutRequest": { "Item": { "PKey": { "S": "2" }, "SKey": { "S": "B" }}}},
{ "PutRequest": { "Item": { "PKey": { "S": "3" }, "SKey": { "S": "C" }}}},
... to 1000

并且您需要将其拆分为每个 25 行的块文件!

我在 LinqPad 中使用以下 c# 代码生成 .sh 文件和 json 块以便能够使用 aws cli

将它们插入 dynamodb
void Main()
{
var sourcePath= @"D:\data\whereYourMainJsonFileIsLocated\";
var sourceFilePath = @"data.json";

var awsVaultProfileName = "dev";
var env = "dev"; 
var tableName = "dynamodb-table-name";

var lines = System.IO.File.ReadAllLines(sourcePath + sourceFilePath);

var destinationPath = Path.Combine(sourcePath, env);
var destinationChunkPath = Path.Combine(sourcePath, env, "chunks");

if (!System.IO.Directory.Exists(destinationChunkPath))
    System.IO.Directory.CreateDirectory(destinationChunkPath);

System.Text.StringBuilder shString= new System.Text.StringBuilder();

for (int i = 0; i < lines.Count(); i = i+25)
{
    var pagedLines = lines.Skip(i).Take(25).ToList().Distinct().ToList();

    System.Text.StringBuilder sb = new System.Text.StringBuilder();
    sb.AppendLine("{");
    sb.AppendLine($"  \"{tableName}\": [");
    
    foreach (var element in pagedLines)
    {
        if (element == pagedLines.Last())
            sb.AppendLine(element.Substring(0, element.Length-1));
        else
            sb.AppendLine(element);
    }
    
    sb.AppendLine("]");
    sb.AppendLine("}");

    var fileName = $"chunk{i / 25}.json";
    System.IO.File.WriteAllText(Path.Combine(destinationChunkPath, fileName), sb.ToString(), Encoding.Default);


    shString.AppendLine($@"aws-vault.exe exec {awsVaultProfileName} -- aws dynamodb batch-write-item --request-items file://chunks/{fileName}");
}

System.IO.File.WriteAllText(Path.Combine(destinationPath, $"{tableName}-{env}.sh"), shString.ToString(), Encoding.Default);
}

结果将是块文件,如 chunk0.json、chunk1.json 等

{
  "dynamodb-table-name": [
    { "PutRequest": { "Item": { "PKey": { "S": "1" }, "SKey": { "S": "A" }}}},
    { "PutRequest": { "Item": { "PKey": { "S": "2" }, "SKey": { "S": "B" }}}},
    { "PutRequest": { "Item": { "PKey": { "S": "3" }, "SKey": { "S": "C" }}}}
  ]
}

和.sh文件

aws-vault.exe exec dev -- aws dynamodb batch-write-item --request-items file://chunks/chunk0.json
aws-vault.exe exec dev -- aws dynamodb batch-write-item --request-items file://chunks/chunk1.json
aws-vault.exe exec dev -- aws dynamodb batch-write-item --request-items file://chunks/chunk2.json

最后 运行 .sh 文件,所有数据都在 table!

const { dynamoClient } = require("./resources/db");
const { v4: uuid } = require("uuid");

const batchWriteLooper = async () => {
   let array = [];
   for (let i = 0; i < 2000; i++) {
   array.push({
     PutRequest: {
      Item: {
      personId: uuid(),
      name: `Person ${i}`,
      age: Math.floor(Math.random() * 100),
      gender: "Male",
      createdAt: new Date(),
      updatedAt: new Date(),
    },
  },
});
}

var perChunk = 20; // items per chunk
var result = array.reduce((resultArray, item, index) => {
const chunkIndex = Math.floor(index / perChunk);

if (!resultArray[chunkIndex]) {
  resultArray[chunkIndex] = []; // start a new chunk
}
resultArray[chunkIndex].push(item);
return resultArray;
}, []);

Promise.all(
result.map(async (chunk) => {
  const params = {
    RequestItems: {
      "persons": chunk,
    },
  };
  return await dynamoClient.batchWrite(params).promise();
})
).then(() => {
 console.log("done");
 });
};

batchWriteLooper();