在单个查询中将多个 JSON 对象插入 Dynamo DB
Insert multiple JSON objects to Dynamo DB in a single query
我的设置 - AWS Lambda、AWS Dynamo DB、nodejs 和无服务器。
我正在从一些 API 中获取一个 JSON 对象数组,并试图将它们插入我的 Dynamo 数据库中。到目前为止,我得到的唯一解决方案是遍历数组并对每个对象进行 DocumentClient() PUT 调用。
我的数据有大约 1000 个对象,写入吞吐量设置为 1。因此,在插入大约 300 条记录并出现此错误后,即使该解决方案对我来说也失败了 - The level of configured provisioned throughput for the table was exceeded.
可以在一个查询中完成吗?
每个项目插入将消耗 1 WCU(每 KB),没有其他办法。
您可以使用效率更高的 batchWrite 而不是单独插入。但是,您仍然需要将插入率调整为 table 写入吞吐量。
此外,请注意您的插入在开始时使用了 300 seconds of burst capacity(您的前 300 次插入),但在这些被消耗之后,您只能使用 1 个 WCU 进行 1 write/s。
这是一个批量插入的例子,它将在块之间等待并重新安排失败的插入:
async function batchedAsync({list, callback, chunkSize=10, msDelayBetweenChunks=0}) {
const emptyList = new Array(Math.ceil(list.length / chunkSize)).fill();
const clonedList = list.slice(0);
const chunks = emptyList.map(_ => clonedList.splice(0, chunkSize));
for (let chunk of chunks) {
if (msDelayBetweenChunks) {
await new Promise(resolve => setTimeout(resolve, msDelayBetweenChunks));
}
await callback(chunk, chunks);
}
}
async function writeItems(tableName, chunk, chunks) {
const {UnprocessedItems} = await documentClient.batchWrite({
RequestItems: {
[tableName]: chunk.map(item => {
return {PutRequest: {Item: item}};
})
}
}).promise();
if (UnprocessedItems.length) {
chunks.push(UnprocessedItems);
}
}
// Example
batchedAsync({
list: itemsToInsert,
callback: writeItems.bind(null, myTableName),
chunkSize: 2, // adjust to provisioned throughput. Max 25 (batchWrite dynamodb limit)
msDelayBetweenChunks: 1000
});
我的设置 - AWS Lambda、AWS Dynamo DB、nodejs 和无服务器。
我正在从一些 API 中获取一个 JSON 对象数组,并试图将它们插入我的 Dynamo 数据库中。到目前为止,我得到的唯一解决方案是遍历数组并对每个对象进行 DocumentClient() PUT 调用。
我的数据有大约 1000 个对象,写入吞吐量设置为 1。因此,在插入大约 300 条记录并出现此错误后,即使该解决方案对我来说也失败了 - The level of configured provisioned throughput for the table was exceeded.
可以在一个查询中完成吗?
每个项目插入将消耗 1 WCU(每 KB),没有其他办法。
您可以使用效率更高的 batchWrite 而不是单独插入。但是,您仍然需要将插入率调整为 table 写入吞吐量。
此外,请注意您的插入在开始时使用了 300 seconds of burst capacity(您的前 300 次插入),但在这些被消耗之后,您只能使用 1 个 WCU 进行 1 write/s。
这是一个批量插入的例子,它将在块之间等待并重新安排失败的插入:
async function batchedAsync({list, callback, chunkSize=10, msDelayBetweenChunks=0}) {
const emptyList = new Array(Math.ceil(list.length / chunkSize)).fill();
const clonedList = list.slice(0);
const chunks = emptyList.map(_ => clonedList.splice(0, chunkSize));
for (let chunk of chunks) {
if (msDelayBetweenChunks) {
await new Promise(resolve => setTimeout(resolve, msDelayBetweenChunks));
}
await callback(chunk, chunks);
}
}
async function writeItems(tableName, chunk, chunks) {
const {UnprocessedItems} = await documentClient.batchWrite({
RequestItems: {
[tableName]: chunk.map(item => {
return {PutRequest: {Item: item}};
})
}
}).promise();
if (UnprocessedItems.length) {
chunks.push(UnprocessedItems);
}
}
// Example
batchedAsync({
list: itemsToInsert,
callback: writeItems.bind(null, myTableName),
chunkSize: 2, // adjust to provisioned throughput. Max 25 (batchWrite dynamodb limit)
msDelayBetweenChunks: 1000
});