如何最大化 DynamoDB 中的 WRU?
How to maximize WRUs in DynamoDB?
我需要每周将记录批量插入 DynamoDB 数据库。为此,我删除了 table,创建了一个具有按需容量的新 table,然后使用 BatchWriteItem 来填充 table。根据 documentation,新创建的 table 具有按需容量可以服务多达 4,000 个 WCU。无论我尝试什么,我最多只能得到 1,487 个 WCU。我尝试了以下方法:
- 在写入之前随机化记录的顺序以避免热分区
- 将记录分成组并在其自己的 Lambda 函数中并行写入每个组
- 运行不同环境下的进程,包括EC2、Lambda、本地(本地比较慢,估计是延迟)
- 将容量类型从按需更改为配备 5 个 RCU 和 10,000 个 WCU 的预置
- 以不同的方式利用 async/await 来尝试最大化吞吐量(我正在使用适用于 .NET 的 AWS SDK)
尽管吞吐量因实验和执行而异,但 1,487 个 WCU 出现的频率足够高,可能具有一定的意义。
我需要做什么才能充分利用我可用的全部 4,000 个 WCU?
你的限制似乎在作者方面,我写了一个小 Python 脚本来创建和加载测试 table。
我们可以看到,DynamoDB 使用 8 个工作进程轻松扩展到 4000 WRU,然后稍微节流,然后再次扩展。为了获得更高的吞吐量,我必须添加更多的编写器进程:
为方便起见,这里是脚本:
import multiprocessing
import typing
import uuid
import boto3
import boto3.dynamodb.conditions as conditions
from botocore.exceptions import ClientError
TABLE = "speed-measurement"
NUMBER_OF_WORKERS = 8
def create_table_if_not_exists(table_name: str):
try:
boto3.client("dynamodb").create_table(
AttributeDefinitions=[{"AttributeName": "PK", "AttributeType": "S"}],
TableName=table_name,
KeySchema=[{"AttributeName": "PK", "KeyType": "HASH"}],
BillingMode="PAY_PER_REQUEST"
)
except ClientError as err:
if err.response["Error"]["Code"] == 'ResourceInUseException':
# Table already exists
pass
else:
raise err
def write_fast(worker_num):
table = boto3.resource("dynamodb").Table(TABLE)
counter = 0
with table.batch_writer() as batch:
while True:
counter += 1
result = batch.put_item(
Item={
"PK": str(uuid.uuid4())
}
)
if counter % 1000 == 0:
print(f"Worker: #{worker_num} Wrote item #{counter}")
def main():
create_table_if_not_exists(TABLE)
with multiprocessing.Pool(NUMBER_OF_WORKERS) as pool:
pool.map(write_fast, range(NUMBER_OF_WORKERS))
if __name__ == "__main__":
main()
只需 运行 使用 Python 3 并在看到所需指标后使用 Ctrl+C 停止它。它将创建一个 table 并在 8 个进程中尽可能快地写入。您也可以增加这个数字。
CloudWatch 图形来源:
{
"metrics": [
[ { "expression": "m2/60", "label": "Write Request Units", "id": "e1", "color": "#2ca02c" } ],
[ "AWS/DynamoDB", "WriteThrottleEvents", "TableName", "speed-measurement", { "yAxis": "right", "id": "m1" } ],
[ ".", "ConsumedWriteCapacityUnits", ".", ".", { "stat": "Sum", "period": 1, "id": "m2", "visible": false } ]
],
"view": "timeSeries",
"stacked": false,
"region": "eu-central-1",
"stat": "Maximum",
"period": 60,
"yAxis": {
"left": {
"label": "Consumed Write Request Units",
"showUnits": false
},
"right": {
"label": "Write Throttle Events",
"showUnits": false
}
},
"annotations": {
"horizontal": [
{
"color": "#9edae5",
"label": "Initial Limit",
"value": 4000,
"fill": "below"
}
]
},
"legend": {
"position": "bottom"
},
"setPeriodToTimeRange": true
}
我需要每周将记录批量插入 DynamoDB 数据库。为此,我删除了 table,创建了一个具有按需容量的新 table,然后使用 BatchWriteItem 来填充 table。根据 documentation,新创建的 table 具有按需容量可以服务多达 4,000 个 WCU。无论我尝试什么,我最多只能得到 1,487 个 WCU。我尝试了以下方法:
- 在写入之前随机化记录的顺序以避免热分区
- 将记录分成组并在其自己的 Lambda 函数中并行写入每个组
- 运行不同环境下的进程,包括EC2、Lambda、本地(本地比较慢,估计是延迟)
- 将容量类型从按需更改为配备 5 个 RCU 和 10,000 个 WCU 的预置
- 以不同的方式利用 async/await 来尝试最大化吞吐量(我正在使用适用于 .NET 的 AWS SDK)
尽管吞吐量因实验和执行而异,但 1,487 个 WCU 出现的频率足够高,可能具有一定的意义。
我需要做什么才能充分利用我可用的全部 4,000 个 WCU?
你的限制似乎在作者方面,我写了一个小 Python 脚本来创建和加载测试 table。
我们可以看到,DynamoDB 使用 8 个工作进程轻松扩展到 4000 WRU,然后稍微节流,然后再次扩展。为了获得更高的吞吐量,我必须添加更多的编写器进程:
为方便起见,这里是脚本:
import multiprocessing
import typing
import uuid
import boto3
import boto3.dynamodb.conditions as conditions
from botocore.exceptions import ClientError
TABLE = "speed-measurement"
NUMBER_OF_WORKERS = 8
def create_table_if_not_exists(table_name: str):
try:
boto3.client("dynamodb").create_table(
AttributeDefinitions=[{"AttributeName": "PK", "AttributeType": "S"}],
TableName=table_name,
KeySchema=[{"AttributeName": "PK", "KeyType": "HASH"}],
BillingMode="PAY_PER_REQUEST"
)
except ClientError as err:
if err.response["Error"]["Code"] == 'ResourceInUseException':
# Table already exists
pass
else:
raise err
def write_fast(worker_num):
table = boto3.resource("dynamodb").Table(TABLE)
counter = 0
with table.batch_writer() as batch:
while True:
counter += 1
result = batch.put_item(
Item={
"PK": str(uuid.uuid4())
}
)
if counter % 1000 == 0:
print(f"Worker: #{worker_num} Wrote item #{counter}")
def main():
create_table_if_not_exists(TABLE)
with multiprocessing.Pool(NUMBER_OF_WORKERS) as pool:
pool.map(write_fast, range(NUMBER_OF_WORKERS))
if __name__ == "__main__":
main()
只需 运行 使用 Python 3 并在看到所需指标后使用 Ctrl+C 停止它。它将创建一个 table 并在 8 个进程中尽可能快地写入。您也可以增加这个数字。
CloudWatch 图形来源:
{
"metrics": [
[ { "expression": "m2/60", "label": "Write Request Units", "id": "e1", "color": "#2ca02c" } ],
[ "AWS/DynamoDB", "WriteThrottleEvents", "TableName", "speed-measurement", { "yAxis": "right", "id": "m1" } ],
[ ".", "ConsumedWriteCapacityUnits", ".", ".", { "stat": "Sum", "period": 1, "id": "m2", "visible": false } ]
],
"view": "timeSeries",
"stacked": false,
"region": "eu-central-1",
"stat": "Maximum",
"period": 60,
"yAxis": {
"left": {
"label": "Consumed Write Request Units",
"showUnits": false
},
"right": {
"label": "Write Throttle Events",
"showUnits": false
}
},
"annotations": {
"horizontal": [
{
"color": "#9edae5",
"label": "Initial Limit",
"value": 4000,
"fill": "below"
}
]
},
"legend": {
"position": "bottom"
},
"setPeriodToTimeRange": true
}