使用 AWS Cli 访问 DynamoDB 中的 AWS Kinesis 表

Access to AWS Kinesis tables in DynamoDB using AWS Cli

我是 AWS Kinesis 的新手。尝试学习如何使用 AWS DynamoDB 构建分布式应用程序。有人可以告诉我如何使用 AWS Cli 访问我在 AWS Kinesis 中的流使用的 DynamoDB 中的表吗?我可以查询表格吗?

当然可以。 Kinesis-stream 使用 Amazon Dynamodb table 进行消费者补偿管理。

  • 您的 table 将与您的消费者名称(通过 KCL 定义)同名。你可以通过控制台看到所有 consumer_offset_tables (https://console.aws.amazon.com/dynamodb/home)

  • consumer 在多个实例中可以是 运行,但只有一个实例 (THE leaseOwner) 可以从一个分区(或 Kinesis 称之为碎片)中消费。如果此消费者失败,同一消费者的另一个实例将接管并从 checkpoint.

  • 继续处理
  • 检查点是最后处理的事件
  • 消费者实例正在处理的分片称为 leaseKey,它对于 table.

  • 是唯一的
  • Dynamodb 中基于键值的文档的数据结构如下,

其中

"S" - Char array

"N" - Number

{
  "leaseOwner": {
    "S": "SmartConsumerStream_Consumer-192.168.1.83"
  },
  "checkpoint": {
    "S": "49570630332110756564477900867375857710984404992079691778"
  },
  "checkpointSubSequenceNumber": {
    "N": "0"
  },
  "leaseCounter": {
    "N": "16"
  },
  "leaseKey": {
    "S": "shardId-000000000000"
  },
  "ownerSwitchesSinceCheckpoint": {
    "N": "0"
  }
}
  • 您可以使用 Dynamodb API 获取给定 partitionKeyleaseKey 的当前偏移量。您只能通过 leaseKey 查询,因为那是 table 中的索引键。它由 Kinesis-stream 自己创建。

  • 您可以使用stream-driver I'm writing at here它为您提供了非常非常容易获得消费者偏移量的界面。

  • 这里有 kinesis-stream tests 可能也有帮助。

综上所述,使用JAVAapi

获取消费者偏移量

-- 将 aws 凭据放入 ~/.aws/credentials

public Map<String, String> getConsumerPosition() {
    DynamoDB dynamoDB = new DynamoDB(getOffsetConnection()); //
    Table consumerOffsetTable = dynamoDB.getTable("your_consumer_id");

    Map<String, Object> leaseOwner = consumerOffsetTable.getItem("leaseKey", "shardId-000000000000").asMap();

    return new HashMap<String, String>(){{
        put(leaseOwner.get("leaseKey").toString(), leaseOwner.get("checkpoint").toString());
    }};
}

public AmazonDynamoDB getOffsetConnection() {
    AmazonDynamoDBClient dynamoDB = new AmazonDynamoDBClient(getAuthProfileCredentials(), getHttpConfiguration());

    return dynamoDB;
}

private ProfileCredentialsProvider getAuthProfileCredentials() {
    return new ProfileCredentialsProvider("your-aws-auth-profile_in_~/.aws/credentials");
}


private ClientConfiguration getHttpConfiguration() {

    ClientConfiguration clientConfiguration = new ClientConfiguration();

    return clientConfiguration;
}

希望对您有所帮助,如果我可以通过其他方式提供帮助,请告诉我。