AWS AppSync - 将全局二级索引添加到 DynamoDB 并使用 GSI 排序键进行分页

AWS AppSync - Adding Global Secondary Index to DynamoDB and pagination using GSI sort key

我正在尝试创建一个结构来列出 post 的注释,postId 排序为 w.r.t。他们 lastChangeTime 降序。

模式中的模型在下面共享。

type Comment {
  id: ID!
  postId: String!
  user: String!
  lastChangeTime: String
  commentBody: String
}

它已经有了支持 DynamoDB table 和通用的 CRUD 解析器。 而id字段是table中的主键。

我打算按如下方式构建查询:

{
  "version": "2017-02-28",
  "operation" : "Query",
  "index" : "postId-index",
  "query" : {
    "expression": "post = :postId",
    "expressionValues" : {
      ":postId" : {
        "S" : "${ctx.args.postId}"
      }
    }
  },
  "limit": $util.defaultIfNull($ctx.args.first, 20),
  "nextToken": $util.toJson($util.defaultIfNullOrEmpty($ctx.args.after, null)),
  "scanIndexForward": false
}

要使其正常工作,我应该如何在 postId(即 postId-index)上添加 全球二级索引 (GSI)

定义的时候在lastChangeTime上加上排序键就可以了吗?或者 lastChangeTime 字段需要自己单独的索引进行排序?

这很容易。您可以通过两种不同的方式来完成,或者您可以同时使用这两种方式以获得更好的灵活性。 (如果你已经解决了我希望它能帮助别人)。

通过这种方式,您可以使用查询参数动态设置 sortDirection。

详细代码如下。在此之前请注意这一点。

  1. 第一点是您的评论类型 - 您正在使用

    type Comment {
      id: ID!
      postId: String!
      ## rest of your type definition
    }
    

这不是设置链接到 Post 的评论类型的最佳方式。

更好的方法是:

type Comment {
    postID: ID!         ## select this as Primary key in DataSource in AppSync console
    commentID: String!    ## select this as Sort key in DataSource in AppSync console
    ## rest of your type definition
}

如果您这样做,您的 DynamoDB table 将具有类似于如下所示的结构 (from this AWS webpage)。

(在您的情况下,UserId 将是 PostId,而 GameTitle 将是 CommentID)

这样,因为所有的评论都会被记录在彼此旁边(在相同的 PostId 下)AppSync 响应时间会快得多。

AppSync docs page中他们也使用了这个例子:

  1. 正如@Lisa M Shon 提到的,您可以在 CommentTable 上启动 GSI,其中 PostId 是分区键,addedTime 是排序键。如果您想使用下面提供的解析器,请将其命名为 'postID-addedTime-index'。请确保您 select 'Number' 在 GSI 中添加了时间。

然后在您的架构中您可以定义以下类型:

type Comment {
    postID: ID!
    commentID: String!
    content: String!
    addedTime: Int!
}

type CommentConnection {
    items: [Comment]
    nextToken: String
}

type Post {
    id: ID!
    postContent: String!
    addedTime: Int!
    ## Option 1. Gets Post details with all related Comments. 
    ## If 'startFromTime' is provided it will fetch all Comments starting from that timestamp.
    ## If 'startFromTime' is not provided it will fetch all Comments.
    comments(
        filter: TableCommentFilterInput,
        sortDirection: SortDirection,
        startFromTime: Int,
        limit: Int,
        nextToken: String
    ): CommentConnection
}

type Query {
    ## Option 2. It will fetch Comments only for a given PostId.
    ## If 'startFromTime' is provided it will fetch all Comments starting from that timestamp.
    ## If 'startFromTime' is not provided it will fetch all Comments.
    postCommentsByAddTime(
        postID: String!,
        startFromTime: Int!,
        sortDirection: SortDirection,
        filter: TableCommentFilterInput,
        count: Int,
        nextToken: String
    ): PaginatedComments
   ## your other queries
}

    ## rest of your schema definition

您可以同时使用选项 1 和选项 2,并且两者都使用。

完整的模式代码在这里(展开下面的代码片段):

type Comment {
 postID: ID!
 commentID: String!
 content: String!
 addedTime: Int!
}

type CommentConnection {
 items: [Comment]
 nextToken: String
}

input CreateCommentInput {
 postID: ID!
 commentID: String!
 content: String!
 addedTime: Int!
}

input CreatePostInput {
 postContent: String!
 addedTime: Int!
}

input DeleteCommentInput {
 postID: ID!
 commentID: String!
}

input DeletePostInput {
 id: ID!
}

type Mutation {
 createComment(input: CreateCommentInput!): Comment
 updateComment(input: UpdateCommentInput!): Comment
 deleteComment(input: DeleteCommentInput!): Comment
 createPost(input: CreatePostInput!): Post
 updatePost(input: UpdatePostInput!): Post
 deletePost(input: DeletePostInput!): Post
}

type PaginatedComments {
 items: [Comment!]!
 nextToken: String
}

type Post {
 id: ID!
 postContent: String!
 addedTime: Int!
 comments(
  filter: TableCommentFilterInput,
  sortDirection: SortDirection,
  startFromTime: Int,
  limit: Int,
  nextToken: String
 ): CommentConnection
}

type PostConnection {
 items: [Post]
 nextToken: String
}

type Query {
 getComment(postID: ID!, commentID: String!): Comment
 listComments(filter: TableCommentFilterInput, limit: Int, nextToken: String): CommentConnection
 getPost(id: ID!): Post
 listPosts(filter: TablePostFilterInput, limit: Int, nextToken: String): PostConnection
 postCommentsByAddTime(
  postID: String!,
  startFromTime: Int!,
  sortDirection: SortDirection,
  filter: TableCommentFilterInput,
  count: Int,
  nextToken: String
 ): PaginatedComments
}

enum SortDirection {
 ASC
 DESC
}

type Subscription {
 onCreateComment(
  postID: ID,
  commentID: String,
  content: String,
  addedTime: Int
 ): Comment
  @aws_subscribe(mutations: ["createComment"])
 onUpdateComment(
  postID: ID,
  commentID: String,
  content: String,
  addedTime: Int
 ): Comment
  @aws_subscribe(mutations: ["updateComment"])
 onDeleteComment(
  postID: ID,
  commentID: String,
  content: String,
  addedTime: Int
 ): Comment
  @aws_subscribe(mutations: ["deleteComment"])
 onCreatePost(id: ID, postContent: String, addedTime: Int): Post
  @aws_subscribe(mutations: ["createPost"])
 onUpdatePost(id: ID, postContent: String, addedTime: Int): Post
  @aws_subscribe(mutations: ["updatePost"])
 onDeletePost(id: ID, postContent: String, addedTime: Int): Post
  @aws_subscribe(mutations: ["deletePost"])
}

input TableBooleanFilterInput {
 ne: Boolean
 eq: Boolean
}

input TableCommentFilterInput {
 postID: TableIDFilterInput
 commentID: TableStringFilterInput
 content: TableStringFilterInput
 addedTime: TableIntFilterInput
}

input TableFloatFilterInput {
 ne: Float
 eq: Float
 le: Float
 lt: Float
 ge: Float
 gt: Float
 contains: Float
 notContains: Float
 between: [Float]
}

input TableIDFilterInput {
 ne: ID
 eq: ID
 le: ID
 lt: ID
 ge: ID
 gt: ID
 contains: ID
 notContains: ID
 between: [ID]
 beginsWith: ID
}

input TableIntFilterInput {
 ne: Int
 eq: Int
 le: Int
 lt: Int
 ge: Int
 gt: Int
 contains: Int
 notContains: Int
 between: [Int]
}

input TablePostFilterInput {
 id: TableIDFilterInput
 postContent: TableStringFilterInput
 addedTime: TableIntFilterInput
}

input TableStringFilterInput {
 ne: String
 eq: String
 le: String
 lt: String
 ge: String
 gt: String
 contains: String
 notContains: String
 between: [String]
 beginsWith: String
}

input UpdateCommentInput {
 postID: ID!
 commentID: String!
 content: String
 addedTime: Int
}

input UpdatePostInput {
 id: ID!
 postContent: String
 addedTime: Int
}

schema {
 query: Query
 mutation: Mutation
 subscription: Subscription
}

  1. (与选项 1 相关)。在右侧面板的 AppSync 控制台的 Schema 页面上找到 Post 并在 'comments(...): CommentConnection' 对面单击 'Attach',添加 'CommentTable' 作为源并在 VTL 中添加以下解析器代码:

请求映射模板中:

    #set( $startFromTime = $util.defaultIfNull($context.args.startFromTime, 0) )
    {
        "version" : "2017-02-28",
        "operation" : "Query",
        "index" : "postID-addedTime-index",
        "query" : {
          "expression": "postID = :postID and addedTime > :startFrom",
            "expressionValues" : {
              ":postID" : { "S" : "$context.source.id" },
              ":startFrom" : { "N" : "$startFromTime" }
            }
        },
        "scanIndexForward":   #if( $context.args.sortDirection )
          #if( $context.args.sortDirection == "ASC" )
              true
          #else
              false
          #end
        #else
            true
        #end,

        #if( ${context.arguments.count} )
            ,"limit": ${context.arguments.count}
        #end
        #if( ${context.arguments.nextToken} )
            ,"nextToken": "${context.arguments.nextToken}"
        #end
    }

响应映射模板中:

{
    "items": $utils.toJson($context.result.items)
    #if( ${context.result.nextToken} )
        ,"nextToken": "${context.result.nextToken}"
    #end
}
  1. (与选项 2 相关)。在右侧面板的 AppSync 控制台的 Schema 页面上找到 Query 并在 'postCommentsByAddTime(...): PaginatedComments' 对面单击 'Attach',添加 CommentTable 作为数据源并在 VTL 中添加以下 Resolver 代码:

请求映射模板中:

{
    "version" : "2017-02-28",
    "operation" : "Query",
    "index" : "postID-addedTime-index",
    "query" : {
      "expression": "postID = :postID and addedTime > :startFrom",
        "expressionValues" : {
          ":postID" : { "S" : "${context.arguments.postID}" },
          ":startFrom" : { "N" : "${context.arguments.startFromTime}" }
        }
    }
    #if( ${context.arguments.count} )
        ,"limit": ${context.arguments.count}
    #end
    #if( ${context.arguments.nextToken} )
        ,"nextToken": "${context.arguments.nextToken}"
    #end
}

响应映射模板中:

{
    "items": $utils.toJson($context.result.items)
    #if( ${context.result.nextToken} )
        ,"nextToken": "${context.result.nextToken}"
    #end
}

就是这样。

现在您可以使用以下所有查询:

query ListPosts {
  listPosts{
    items {
      id
      postContent
      ## all below arguments are nullable
      comments(startFromTime: 121111112222, count: 4
      ## default sortDirection is ASC, you can change it this way
      ## sortDirection: DESC
    ) {
        items {
          postID
          commentID
          content
          addedTime
        }
      }
    }
  }
}

query GetPost {
  getPost(id: "6548e596-d1ed-4203-a32f-52cfab8c9b20") {
    id
    comments (
    ## you can also add all three or any or none of these
    ## sortDirection: DESC,
    ## startFromTime: 189283212122
    ## count: 5
    ) {
        items {
        postID
        commentID
        content
        addedTime
      }
  }
  }
}

query GetCommentsByTime {
  postCommentsByAddTime(postID: "6548e596-d1ed-4203-a32f-52cfab8c9b20", startFromTime: 12423455352342, count: 2) {
    items {
      postID
      commentID
      content
      addedTime
    }
  }
}