Dynamodb 仅根据排序键获取单个项目

Dynamodb get a single item based on sort key only

我是 dynamodb 的新手,我需要处理 5M 条记录。每条记录都有一个 ID 和一个状态。我需要根据状态查询每条记录,处理它,最后更新状态。

我正在使用 DynamoDbEnhancedClient 但我找不到有关如何仅基于范围而不是哈希进行查询同时避免扫描的示例。 我尝试创建一个条件和限制为 1 的查询,但它不起作用。

这是我的:

我的客户模型:

@DynamoDbPartitionKey
private String id;
@DynamoDbSecondarySortKey(indexNames = "status")
private String status;
private String name;

配置:

@Bean
public DynamoDbEnhancedClient dynamoDbEnhancedClient(){
    return DynamoDbEnhancedClient.builder()
            .dynamoDbClient(dynamoDbClient())
            .extensions(AutoGeneratedTimestampRecordExtension.create())
            .build();
}

我的查询:

static final TableSchema<Customer> CUSTOMER_TABLE = TableSchema.fromClass(Customer.class);

public Customer findByStatus() {
    DynamoDbTable<Customer> customerTable = dynamoDbEnhancedClient.table("customer", CUSTOMER_TABLE);

    QueryConditional queryConditionalPerPartition = new EqualToConditional(Key.builder().
            partitionValue("status").
            build());


    QueryEnhancedRequest request = QueryEnhancedRequest.builder()
            .limit(1)
            .queryConditional(queryConditionalPerPartition)
            .build();


    PageIterable<Customer> pageIterable = customerTable.query(request);


    Customer customer = pageIterable.stream().findFirst().get().items().get(0);
    return customer;
}

然而那是行不通的。如何按状态查询只能得到一个结果? 我对 table 结构没有限制,我可以根据需要更改它。

所以我终于想通了,我需要创建一个 GSI (Global Secondary Index)

创建 GCI 时,您将排序键定义为散列键,这样您就可以查询索引。

创建 table:

aws dynamodb create-table \
--table-name customer \
 --key-schema \
    AttributeName=id,KeyType=HASH \
    AttributeName=status,KeyType=RANGE \
--attribute-definitions \
    AttributeName=id,AttributeType=S \
    AttributeName=status,AttributeType=S \
--provisioned-throughput \
    ReadCapacityUnits=5,WriteCapacityUnits=5 \
--table-class STANDARD \
--global-secondary-index '[
    {
  \"IndexName\": \"id-status\",
  \"KeySchema\": [
    {
       \"AttributeName\": \"status\",
      \"KeyType\": \"HASH\"
    },
    {
       \"AttributeName\": \"id\",
      \"KeyType\": \"RANGE\"
    }
  ],
  \"Projection\": {
    \"ProjectionType\": \"ALL\"
  },
  \"ProvisionedThroughput\": {
    \"ReadCapacityUnits\": 1,
    \"WriteCapacityUnits\": 1
  }
}
]'

我在 Java 中的模型:

@DynamoDbBean
public class Customer {

private String id;
private String status;
private Instant created;
private Instant updated;

public Customer(){


@DynamoDbPartitionKey
@DynamoDbSecondarySortKey(indexNames = "id-status")
public String getId() {
    return id;
}

@DynamoDbSortKey
@DynamoDbSecondaryPartitionKey(indexNames = "id-status")
public String getStatus() {
    return status;
}


@DynamoDbAutoGeneratedTimestampAttribute
@DynamoDBTypeConverted(converter = InstantToStringTypeConverter.class)
public Instant getCreated() {
    return created;
}

@DynamoDbAutoGeneratedTimestampAttribute
@DynamoDBTypeConverted(converter = InstantToStringTypeConverter.class)
public Instant getUpdated() {
    return updated;
 }
}

比查询数据库:

@Service
public class customerDAO {

static final TableSchema<customer> CUSTOMER_TABLE = 
TableSchema.fromBean(Customer.class);

@Autowired
private DynamoDbEnhancedClient dynamoDbEnhancedClient;


public Customer findByStatus() {


    DynamoDbTable<Customer> customerTable = dynamoDbEnhancedClient.table("customer", CUSTOIMER_TABLE);



    DynamoDbIndex<Customer> secIndex = customerTable.index("id-status");



    QueryConditional queryConditional = QueryConditional
            .keyEqualTo(Key.builder().partitionValue("PENDING").
                    build());


    PageIterable<Customer> results =
            (PageIterable<Customer>) secIndex.query(QueryEnhancedRequest.builder().
                    .queryConditional(queryConditional)
                    .build());
    results.forEach(p -> p.items().forEach(item -> System.out.println(customer)));

  }
 }