如何在 AWS CDK 2.0 中写入稀疏的全局二级索引行?
How to write sparse Global Secondary Index rows in AWS CDK 2.0?
我正在尝试使用 AWS CDK 实现类似 this 的功能,其中我有一个每 ~30 分钟写入一次的 table,以及一个对每天的值求和的聚合器函数.对 table 的原始写入将包含以下列:{player, timestamp, skills, activities}
。我想要一个稀疏的 GSI 每天聚合这个,所以这些行将有列 {player, date, skills, activities}
.
这是我的 CDK 代码:
class TrackerStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
"""Construct a TrackerStack."""
super().__init__(scope, construct_id, **kwargs)
table = ddb.Table(
self,
"GranularTable",
partition_key=ddb.Attribute(name="player", type=ddb.AttributeType.STRING),
sort_key=ddb.Attribute(name="timestamp", type=ddb.AttributeType.STRING),
encryption=ddb.TableEncryption.AWS_MANAGED,
read_capacity=5,
write_capacity=5,
removal_policy=RemovalPolicy.DESTROY,
stream=ddb.StreamViewType.NEW_IMAGE,
)
table.add_global_secondary_index(
index_name="DailyAggregate",
partition_key=ddb.Attribute(name="player", type=ddb.AttributeType.STRING),
sort_key=ddb.Attribute(name="date", type=ddb.AttributeType.STRING),
read_capacity=3,
write_capacity=3,
)
aggregation_lambda = _lambda.Function(
self,
"DailyAggregatorLambda",
handler="aggregator.handler",
code=_lambda.Code.from_asset("lambda/aggregator"),
runtime=_lambda.Runtime.PYTHON_3_8,
environment={"TABLE_NAME": table.table_name},
)
table.grant_read_write_data(aggregation_lambda)
aggregation_lambda.add_event_source(
lambda_event_sources.DynamoEventSource(
hiscores_table,
starting_position=_lambda.StartingPosition.TRIM_HORIZON,
batch_size=1,
)
)
这是我的 lambda 代码:
ddb = boto3.resource("dynamodb")
table = ddb.Table(os.environ["TABLE_NAME"])
def _timestamp_to_date(timestamp):
return timestamp.split()[0]
def _image_map(_map):
return _map["M"]
def _image_num(_map):
return _map["N"]
def _image_str(_map):
return _map["S"]
def handler(event, context):
event_name = event["Records"][0]["eventName"]
event_source = event["Records"][0]["eventSource"]
logger.info(f"Processing Event '{event_name}' from source '{event_source}'.")
new_image = event["Records"][0]["dynamodb"]["NewImage"]
logger.info(f"Received image: {new_image}")
if event_name == "INSERT":
player_id = _image_str(new_image["player"])
timestamp = _image_str(new_image["timestamp"])
date = _timestamp_to_date(timestamp)
# Increment divisor
logger.debug(f"Incrementing divisor for {player_id}:{date}")
table.update_item(
Key={"player": player_id, "date": date},
UpdateExpression="ADD divisor :incr",
ExpressionAttributeValues={":incr": 1},
)
当我写入 table 时,聚合器被正确调用,但它无法写入新的全局二级索引:
[ERROR] ClientError: An error occurred (ValidationException) when calling the UpdateItem operation: The provided key element does not match the schema
Traceback (most recent call last):
File "/var/task/aggregator.py", line 47, in handler
table.update_item(
File "/var/runtime/boto3/resources/factory.py", line 520, in do_action
response = action(self, *args, **kwargs)
File "/var/runtime/boto3/resources/action.py", line 83, in __call__
response = getattr(parent.meta.client, operation_name)(*args, **params)
File "/var/runtime/botocore/client.py", line 386, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/var/runtime/botocore/client.py", line 705, in _make_api_call
raise error_class(parsed_response, operation_name)
这是有道理的,因为 date
属性不包含在写入原始 table 的行中。但是,它看起来不像 Table
API (reference) 提供了指定 AttributeDefinitions 的选项。我尝试在创建记录时向 table 写入一个空的“日期”列,以便在模式中推断它,但出现以下错误(此错误是针对写入空字符串;写入时出现类似错误空值):
[ERROR] ClientError: An error occurred (ValidationException) when calling the PutItem operation: One or more parameter values are not valid. A value specified for a secondary index key is not supported. The AttributeValue for a key attribute cannot contain an empty string value. IndexName: DailyAggregate, IndexKey: date
Traceback (most recent call last):
File "/var/task/get_and_parse_hiscores.py", line 47, in handler
table.put_item(Item=payload)
File "/var/runtime/boto3/resources/factory.py", line 520, in do_action
response = action(self, *args, **kwargs)
File "/var/runtime/boto3/resources/action.py", line 83, in __call__
response = getattr(parent.meta.client, operation_name)(*args, **params)
File "/var/runtime/botocore/client.py", line 386, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/var/runtime/botocore/client.py", line 705, in _make_api_call
raise error_class(parsed_response, operation_name)
有没有办法使用这些工具实现此功能?
编辑:虽然 Table
API 不允许用户指定模式,但 CfnTable
API 允许(reference)。我尝试使用 CfnTable
:
来实现这一点
class TrackerStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
"""Construct a TrackerStack."""
super().__init__(scope, construct_id, **kwargs)
cfn_table = ddb.CfnTable(
self,
"GranularCfnTable",
attribute_definitions=[
ddb.CfnTable.AttributeDefinitionProperty(
attribute_name="date",
attribute_type="S",
)
],
key_schema=[
ddb.CfnTable.KeySchemaProperty(
attribute_name="player", key_type="HASH"
),
ddb.CfnTable.KeySchemaProperty(
attribute_name="timestamp", key_type="RANGE"
),
],
global_secondary_indexes=[
ddb.CfnTable.GlobalSecondaryIndexProperty(
index_name="DailyAggregate",
key_schema=[
ddb.CfnTable.KeySchemaProperty(
attribute_name="player", key_type="HASH"
),
ddb.CfnTable.KeySchemaProperty(
attribute_name="date", key_type="RANGE"
),
],
projection=ddb.CfnTable.ProjectionProperty(projection_type="ALL"),
provisioned_throughput=ddb.CfnTable.ProvisionedThroughputProperty(
read_capacity_units=3,
write_capacity_units=3,
),
)
],
provisioned_throughput=ddb.CfnTable.ProvisionedThroughputProperty(
read_capacity_units=5,
write_capacity_units=5,
),
sse_specification=ddb.CfnTable.SSESpecificationProperty(sse_enabled=True),
stream_specification=ddb.CfnTable.StreamSpecificationProperty(
stream_view_type="NEW_IMAGE"
),
)
cfn_table.apply_removal_policy(RemovalPolicy.DESTROY)
table = ddb.Table.from_table_arn(self, "GranularTable", cfn_table.attr_arn)
aggregation_lambda = _lambda.Function(
self,
"DailyAggregatorLambda",
handler="aggregator.handler",
code=_lambda.Code.from_asset("lambda/aggregator"),
runtime=_lambda.Runtime.PYTHON_3_8,
environment={
"TABLE_NAME": table.table_name,
},
)
table.grant_read_write_data(aggregation_lambda)
aggregation_lambda.add_event_source(
lambda_event_sources.DynamoEventSource(
table,
starting_position=_lambda.StartingPosition.TRIM_HORIZON,
batch_size=1,
)
)
但是,cdk synth
失败并出现以下错误。我在协调 1 级 CloudFormation APIs 与 2 级 CDK APIs.
时遇到了一些问题
jsii.errors.JSIIError: DynamoDB Streams must be enabled on the table TrackerStack/GranularTable
您在 table 设计和使用流事件聚合方面表现出色。许多人都在努力走到这一步。
需要对几个问题进行排序才能使事情正常进行。好消息是修复涉及
消除当前设置的复杂性。
[编辑] 首先:您的更新操作失败不是因为 CDK 或模式问题,而是因为 update_item
缺少必需的 SK timestamp
字段。 Dynamo 需要主键的唯一值 - 而您只提供了 player
值,而不是 timestamp
。不需要索引 SK 字段 date
。这就是稀疏索引中的“稀疏”!
接下来是“模式”。 DynamoDB 几乎是无模式的——几乎是因为它需要一个明确的简单或复合主键(PK 或 PK+SK)。 L1 Construct CfnTable.KeySchemaProperty
和 AttributeDefinition
设置这些。但是使用 L2 Table
partition_key
和 sort_key
(以及它们的索引等价物)要容易得多,它们做同样的事情。
最后,关于 table 设计的思考。使用复合键模式(也使用 OP link),您可以在没有 GSI 的情况下实现每日得分模式。
您可以通过查询 PK=player1 AND begins_with(SK, "Daily")
(使用 Limit=1
和 ScanIndexForward=False
)获得 player1 最近一天的得分。
PK
SK
player1
PlayerInfo
player1
Daily#20211214
player1
Daily#20211215
player2
PlayerInfo
player2
Daily#20211214
player2
Daily#20211215
综上所述,回到您的问题:
How to write sparse Global Secondary Index rows AWS CDK 2.0?
你不知道。您在 CDK 中定义 GSI 及其密钥,但使用 SDK/Console/etc。实际写入行。*
Is there a way to achieve this functionality using these tools?
是的。修复查询,将 Table 构造回滚到 L2,一切都会为您准备就绪。
* 您可以使用 CDK Custom Resource 来播种初始行,但这是一个高级的可有可无的东西,不是必须的。
我正在尝试使用 AWS CDK 实现类似 this 的功能,其中我有一个每 ~30 分钟写入一次的 table,以及一个对每天的值求和的聚合器函数.对 table 的原始写入将包含以下列:{player, timestamp, skills, activities}
。我想要一个稀疏的 GSI 每天聚合这个,所以这些行将有列 {player, date, skills, activities}
.
这是我的 CDK 代码:
class TrackerStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
"""Construct a TrackerStack."""
super().__init__(scope, construct_id, **kwargs)
table = ddb.Table(
self,
"GranularTable",
partition_key=ddb.Attribute(name="player", type=ddb.AttributeType.STRING),
sort_key=ddb.Attribute(name="timestamp", type=ddb.AttributeType.STRING),
encryption=ddb.TableEncryption.AWS_MANAGED,
read_capacity=5,
write_capacity=5,
removal_policy=RemovalPolicy.DESTROY,
stream=ddb.StreamViewType.NEW_IMAGE,
)
table.add_global_secondary_index(
index_name="DailyAggregate",
partition_key=ddb.Attribute(name="player", type=ddb.AttributeType.STRING),
sort_key=ddb.Attribute(name="date", type=ddb.AttributeType.STRING),
read_capacity=3,
write_capacity=3,
)
aggregation_lambda = _lambda.Function(
self,
"DailyAggregatorLambda",
handler="aggregator.handler",
code=_lambda.Code.from_asset("lambda/aggregator"),
runtime=_lambda.Runtime.PYTHON_3_8,
environment={"TABLE_NAME": table.table_name},
)
table.grant_read_write_data(aggregation_lambda)
aggregation_lambda.add_event_source(
lambda_event_sources.DynamoEventSource(
hiscores_table,
starting_position=_lambda.StartingPosition.TRIM_HORIZON,
batch_size=1,
)
)
这是我的 lambda 代码:
ddb = boto3.resource("dynamodb")
table = ddb.Table(os.environ["TABLE_NAME"])
def _timestamp_to_date(timestamp):
return timestamp.split()[0]
def _image_map(_map):
return _map["M"]
def _image_num(_map):
return _map["N"]
def _image_str(_map):
return _map["S"]
def handler(event, context):
event_name = event["Records"][0]["eventName"]
event_source = event["Records"][0]["eventSource"]
logger.info(f"Processing Event '{event_name}' from source '{event_source}'.")
new_image = event["Records"][0]["dynamodb"]["NewImage"]
logger.info(f"Received image: {new_image}")
if event_name == "INSERT":
player_id = _image_str(new_image["player"])
timestamp = _image_str(new_image["timestamp"])
date = _timestamp_to_date(timestamp)
# Increment divisor
logger.debug(f"Incrementing divisor for {player_id}:{date}")
table.update_item(
Key={"player": player_id, "date": date},
UpdateExpression="ADD divisor :incr",
ExpressionAttributeValues={":incr": 1},
)
当我写入 table 时,聚合器被正确调用,但它无法写入新的全局二级索引:
[ERROR] ClientError: An error occurred (ValidationException) when calling the UpdateItem operation: The provided key element does not match the schema
Traceback (most recent call last):
File "/var/task/aggregator.py", line 47, in handler
table.update_item(
File "/var/runtime/boto3/resources/factory.py", line 520, in do_action
response = action(self, *args, **kwargs)
File "/var/runtime/boto3/resources/action.py", line 83, in __call__
response = getattr(parent.meta.client, operation_name)(*args, **params)
File "/var/runtime/botocore/client.py", line 386, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/var/runtime/botocore/client.py", line 705, in _make_api_call
raise error_class(parsed_response, operation_name)
这是有道理的,因为 date
属性不包含在写入原始 table 的行中。但是,它看起来不像 Table
API (reference) 提供了指定 AttributeDefinitions 的选项。我尝试在创建记录时向 table 写入一个空的“日期”列,以便在模式中推断它,但出现以下错误(此错误是针对写入空字符串;写入时出现类似错误空值):
[ERROR] ClientError: An error occurred (ValidationException) when calling the PutItem operation: One or more parameter values are not valid. A value specified for a secondary index key is not supported. The AttributeValue for a key attribute cannot contain an empty string value. IndexName: DailyAggregate, IndexKey: date
Traceback (most recent call last):
File "/var/task/get_and_parse_hiscores.py", line 47, in handler
table.put_item(Item=payload)
File "/var/runtime/boto3/resources/factory.py", line 520, in do_action
response = action(self, *args, **kwargs)
File "/var/runtime/boto3/resources/action.py", line 83, in __call__
response = getattr(parent.meta.client, operation_name)(*args, **params)
File "/var/runtime/botocore/client.py", line 386, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/var/runtime/botocore/client.py", line 705, in _make_api_call
raise error_class(parsed_response, operation_name)
有没有办法使用这些工具实现此功能?
编辑:虽然 Table
API 不允许用户指定模式,但 CfnTable
API 允许(reference)。我尝试使用 CfnTable
:
class TrackerStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
"""Construct a TrackerStack."""
super().__init__(scope, construct_id, **kwargs)
cfn_table = ddb.CfnTable(
self,
"GranularCfnTable",
attribute_definitions=[
ddb.CfnTable.AttributeDefinitionProperty(
attribute_name="date",
attribute_type="S",
)
],
key_schema=[
ddb.CfnTable.KeySchemaProperty(
attribute_name="player", key_type="HASH"
),
ddb.CfnTable.KeySchemaProperty(
attribute_name="timestamp", key_type="RANGE"
),
],
global_secondary_indexes=[
ddb.CfnTable.GlobalSecondaryIndexProperty(
index_name="DailyAggregate",
key_schema=[
ddb.CfnTable.KeySchemaProperty(
attribute_name="player", key_type="HASH"
),
ddb.CfnTable.KeySchemaProperty(
attribute_name="date", key_type="RANGE"
),
],
projection=ddb.CfnTable.ProjectionProperty(projection_type="ALL"),
provisioned_throughput=ddb.CfnTable.ProvisionedThroughputProperty(
read_capacity_units=3,
write_capacity_units=3,
),
)
],
provisioned_throughput=ddb.CfnTable.ProvisionedThroughputProperty(
read_capacity_units=5,
write_capacity_units=5,
),
sse_specification=ddb.CfnTable.SSESpecificationProperty(sse_enabled=True),
stream_specification=ddb.CfnTable.StreamSpecificationProperty(
stream_view_type="NEW_IMAGE"
),
)
cfn_table.apply_removal_policy(RemovalPolicy.DESTROY)
table = ddb.Table.from_table_arn(self, "GranularTable", cfn_table.attr_arn)
aggregation_lambda = _lambda.Function(
self,
"DailyAggregatorLambda",
handler="aggregator.handler",
code=_lambda.Code.from_asset("lambda/aggregator"),
runtime=_lambda.Runtime.PYTHON_3_8,
environment={
"TABLE_NAME": table.table_name,
},
)
table.grant_read_write_data(aggregation_lambda)
aggregation_lambda.add_event_source(
lambda_event_sources.DynamoEventSource(
table,
starting_position=_lambda.StartingPosition.TRIM_HORIZON,
batch_size=1,
)
)
但是,cdk synth
失败并出现以下错误。我在协调 1 级 CloudFormation APIs 与 2 级 CDK APIs.
jsii.errors.JSIIError: DynamoDB Streams must be enabled on the table TrackerStack/GranularTable
您在 table 设计和使用流事件聚合方面表现出色。许多人都在努力走到这一步。 需要对几个问题进行排序才能使事情正常进行。好消息是修复涉及 消除当前设置的复杂性。
[编辑] 首先:您的更新操作失败不是因为 CDK 或模式问题,而是因为 update_item
缺少必需的 SK timestamp
字段。 Dynamo 需要主键的唯一值 - 而您只提供了 player
值,而不是 timestamp
。不需要索引 SK 字段 date
。这就是稀疏索引中的“稀疏”!
接下来是“模式”。 DynamoDB 几乎是无模式的——几乎是因为它需要一个明确的简单或复合主键(PK 或 PK+SK)。 L1 Construct CfnTable.KeySchemaProperty
和 AttributeDefinition
设置这些。但是使用 L2 Table
partition_key
和 sort_key
(以及它们的索引等价物)要容易得多,它们做同样的事情。
最后,关于 table 设计的思考。使用复合键模式(也使用 OP link),您可以在没有 GSI 的情况下实现每日得分模式。
您可以通过查询 PK=player1 AND begins_with(SK, "Daily")
(使用 Limit=1
和 ScanIndexForward=False
)获得 player1 最近一天的得分。
PK |
SK |
---|---|
player1 |
PlayerInfo |
player1 |
Daily#20211214 |
player1 |
Daily#20211215 |
player2 |
PlayerInfo |
player2 |
Daily#20211214 |
player2 |
Daily#20211215 |
综上所述,回到您的问题:
How to write sparse Global Secondary Index rows AWS CDK 2.0?
你不知道。您在 CDK 中定义 GSI 及其密钥,但使用 SDK/Console/etc。实际写入行。*
Is there a way to achieve this functionality using these tools?
是的。修复查询,将 Table 构造回滚到 L2,一切都会为您准备就绪。
* 您可以使用 CDK Custom Resource 来播种初始行,但这是一个高级的可有可无的东西,不是必须的。