使用 AWS Cli 访问 DynamoDB 中的 AWS Kinesis 表
Access to AWS Kinesis tables in DynamoDB using AWS Cli
我是 AWS Kinesis 的新手。尝试学习如何使用 AWS DynamoDB 构建分布式应用程序。有人可以告诉我如何使用 AWS Cli 访问我在 AWS Kinesis 中的流使用的 DynamoDB 中的表吗?我可以查询表格吗?
当然可以。 Kinesis-stream 使用 Amazon Dynamodb table 进行消费者补偿管理。
您的 table 将与您的消费者名称(通过 KCL 定义)同名。你可以通过控制台看到所有 consumer_offset_tables (https://console.aws.amazon.com/dynamodb/home)
consumer 在多个实例中可以是 运行,但只有一个实例 (THE leaseOwner
) 可以从一个分区(或 Kinesis 称之为碎片)中消费。如果此消费者失败,同一消费者的另一个实例将接管并从 checkpoint
.
继续处理
- 检查点是最后处理的事件
消费者实例正在处理的分片称为 leaseKey
,它对于 table.
是唯一的
Dynamodb 中基于键值的文档的数据结构如下,
其中
"S" - Char array
"N" - Number
{
"leaseOwner": {
"S": "SmartConsumerStream_Consumer-192.168.1.83"
},
"checkpoint": {
"S": "49570630332110756564477900867375857710984404992079691778"
},
"checkpointSubSequenceNumber": {
"N": "0"
},
"leaseCounter": {
"N": "16"
},
"leaseKey": {
"S": "shardId-000000000000"
},
"ownerSwitchesSinceCheckpoint": {
"N": "0"
}
}
您可以使用 Dynamodb API 获取给定 partitionKey
或 leaseKey
的当前偏移量。您只能通过 leaseKey
查询,因为那是 table 中的索引键。它由 Kinesis-stream 自己创建。
您可以使用stream-driver I'm writing at here,它为您提供了非常非常容易获得消费者偏移量的界面。
这里有 kinesis-stream tests 可能也有帮助。
综上所述,使用JAVAapi
获取消费者偏移量
-- 将 aws 凭据放入 ~/.aws/credentials
public Map<String, String> getConsumerPosition() {
DynamoDB dynamoDB = new DynamoDB(getOffsetConnection()); //
Table consumerOffsetTable = dynamoDB.getTable("your_consumer_id");
Map<String, Object> leaseOwner = consumerOffsetTable.getItem("leaseKey", "shardId-000000000000").asMap();
return new HashMap<String, String>(){{
put(leaseOwner.get("leaseKey").toString(), leaseOwner.get("checkpoint").toString());
}};
}
public AmazonDynamoDB getOffsetConnection() {
AmazonDynamoDBClient dynamoDB = new AmazonDynamoDBClient(getAuthProfileCredentials(), getHttpConfiguration());
return dynamoDB;
}
private ProfileCredentialsProvider getAuthProfileCredentials() {
return new ProfileCredentialsProvider("your-aws-auth-profile_in_~/.aws/credentials");
}
private ClientConfiguration getHttpConfiguration() {
ClientConfiguration clientConfiguration = new ClientConfiguration();
return clientConfiguration;
}
希望对您有所帮助,如果我可以通过其他方式提供帮助,请告诉我。
我是 AWS Kinesis 的新手。尝试学习如何使用 AWS DynamoDB 构建分布式应用程序。有人可以告诉我如何使用 AWS Cli 访问我在 AWS Kinesis 中的流使用的 DynamoDB 中的表吗?我可以查询表格吗?
当然可以。 Kinesis-stream 使用 Amazon Dynamodb table 进行消费者补偿管理。
您的 table 将与您的消费者名称(通过 KCL 定义)同名。你可以通过控制台看到所有 consumer_offset_tables (https://console.aws.amazon.com/dynamodb/home)
consumer 在多个实例中可以是 运行,但只有一个实例 (THE
leaseOwner
) 可以从一个分区(或 Kinesis 称之为碎片)中消费。如果此消费者失败,同一消费者的另一个实例将接管并从checkpoint
. 继续处理
- 检查点是最后处理的事件
消费者实例正在处理的分片称为
leaseKey
,它对于 table. 是唯一的
Dynamodb 中基于键值的文档的数据结构如下,
其中
"S" - Char array
"N" - Number
{
"leaseOwner": {
"S": "SmartConsumerStream_Consumer-192.168.1.83"
},
"checkpoint": {
"S": "49570630332110756564477900867375857710984404992079691778"
},
"checkpointSubSequenceNumber": {
"N": "0"
},
"leaseCounter": {
"N": "16"
},
"leaseKey": {
"S": "shardId-000000000000"
},
"ownerSwitchesSinceCheckpoint": {
"N": "0"
}
}
您可以使用 Dynamodb API 获取给定
partitionKey
或leaseKey
的当前偏移量。您只能通过leaseKey
查询,因为那是 table 中的索引键。它由 Kinesis-stream 自己创建。您可以使用stream-driver I'm writing at here,它为您提供了非常非常容易获得消费者偏移量的界面。
这里有 kinesis-stream tests 可能也有帮助。
综上所述,使用JAVAapi
获取消费者偏移量-- 将 aws 凭据放入 ~/.aws/credentials
public Map<String, String> getConsumerPosition() {
DynamoDB dynamoDB = new DynamoDB(getOffsetConnection()); //
Table consumerOffsetTable = dynamoDB.getTable("your_consumer_id");
Map<String, Object> leaseOwner = consumerOffsetTable.getItem("leaseKey", "shardId-000000000000").asMap();
return new HashMap<String, String>(){{
put(leaseOwner.get("leaseKey").toString(), leaseOwner.get("checkpoint").toString());
}};
}
public AmazonDynamoDB getOffsetConnection() {
AmazonDynamoDBClient dynamoDB = new AmazonDynamoDBClient(getAuthProfileCredentials(), getHttpConfiguration());
return dynamoDB;
}
private ProfileCredentialsProvider getAuthProfileCredentials() {
return new ProfileCredentialsProvider("your-aws-auth-profile_in_~/.aws/credentials");
}
private ClientConfiguration getHttpConfiguration() {
ClientConfiguration clientConfiguration = new ClientConfiguration();
return clientConfiguration;
}
希望对您有所帮助,如果我可以通过其他方式提供帮助,请告诉我。