并行执行 DynamoDB 查询(全局二级索引的 BatchGetItems)
Executing DynamoDB queries in parallel (BatchGetItems for Global Secondary Index)
这里的想法是 运行 多个 DynamoDB 并行查询,因为查询是 运行 通过 GSI 进行的。截至目前 BatchGetItems doesn't support querying over Indexes,推荐的方法是并行查询数据。我正在使用带有 wg 的 go routines 来并行处理例程的执行。
函数的输入是一个带有 ID 的字符串数组,输出是 ID 的属性。
当函数在本地运行时,没有问题,但是当函数在AWS-Lambda上运行时,返回的数据不断增长;
即;输入 2 项应输出 2 项。如果函数在 AWS-Lambda 上测试,
- 第一次函数returns 2 项
- 第二次 returns 4 项(相同的项重复 2 次)
- 第三次 returns 6 项(相同的项重复 4 次)
等等。这是代码片段。每次 lambda 为 运行 时,是否有未正确处理的问题导致 lambda 输出额外的数据集?
package main
import (
"context"
"fmt"
"os"
"sync"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
)
//Final Output Interface
var bulkOutput []interface{}
func exitWithError(err error) {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
//LambdaInputJSON input for the lambda handler
type LambdaInputJSON struct {
Ids []string `json:"ids,omitempty"`
}
//HandleRequest : Lambda entry point
func HandleRequest(ctx context.Context, data LambdaInputJSON) ([]interface{}, error) {
return DynamoDBBatchGetRecords(data), nil
}
func main() {
lambda.Start(HandleRequest)
}
func DynamoDBBatchGetRecords(a LambdaInputJSON) []interface{} {
var wg sync.WaitGroup
var mutex = &sync.Mutex{}
iterations := len(a.Ids)
wg.Add(iterations)
for i := 0; i < iterations; i++ {
go QueryOutput(a.Ids[i], &wg, mutex)
}
wg.Wait()
return bulkOutput
}
//QueryOutput GoRoutine
func QueryOutput(data string, wg *sync.WaitGroup, mtx *sync.Mutex) {
var outputData []interface{}
defer wg.Done()
sess, err := session.NewSession(&aws.Config{
Region: aws.String("aws-region"),
})
if err != nil {
exitWithError(fmt.Errorf("failed to make Query API call, %v", err))
}
ddb := dynamodb.New(sess)
queryInput := &dynamodb.QueryInput{
Limit: aws.Int64(1),
TableName: aws.String("table-name"),
IndexName: aws.String("gsi-index"),
ScanIndexForward: aws.Bool(false),
ConsistentRead: aws.Bool(false),
KeyConditions: map[string]*dynamodb.Condition{
"column_name": {
ComparisonOperator: aws.String("EQ"),
AttributeValueList: []*dynamodb.AttributeValue{
{
S: aws.String(data),
},
},
},
},
}
output, err := ddb.Query(queryInput)
if err != nil {
exitWithError(fmt.Errorf("Failed to make Query API call, %v", err))
}
err = dynamodbattribute.UnmarshalListOfMaps(output.Items, &outputData)
if err != nil {
exitWithError(fmt.Errorf("Failed to unmarshal Query result items, %v", err))
}
mtx.Lock()
bulkOutput = append(bulkOutput, outputData)
mtx.Unlock()
}
根据 documentation,全局变量独立于您的 Lambda 函数的处理程序代码。这导致缓冲区随着时间的推移而增加。
修正后的参考粘贴在下面。
package main
import (
"context"
"fmt"
"os"
"sync"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
)
func exitWithError(err error) {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
//HandleRequest : Lambda entry point
func HandleRequest(ctx context.Context, data LambdaInputJSON) ([]interface{}, error) {
output := DynamoDBBatchGetRecords(data)
return output, nil
}
func main() {
lambda.Start(HandleRequest)
}
func DynamoDBBatchGetRecords(a LambdaInputJSON) []interface{} {
var dataOut []interface{}
var wg = &sync.WaitGroup{}
var mtx = &sync.Mutex{}
iterations := len(a.Ids)
wg.Add(iterations)
for i := 0; i < i; i++ {
go func(i int) {
defer wg.Done()
var outputData []interface{}
sess, err := session.NewSession(&aws.Config{
Region: aws.String("aws-region"),
})
if err != nil {
exitWithError(fmt.Errorf("failed to make Query API call, %v", err))
}
ddb := dynamodb.New(sess)
queryInput := &dynamodb.QueryInput{
Limit: aws.Int64(1),
TableName: aws.String("table"),
IndexName: aws.String("index"),
ScanIndexForward: aws.Bool(false),
ConsistentRead: aws.Bool(false),
KeyConditions: map[string]*dynamodb.Condition{
"index-column": {
ComparisonOperator: aws.String("EQ"),
AttributeValueList: []*dynamodb.AttributeValue{
{
S: aws.String(a.Ids[i]),
},
},
},
},
}
output, err := ddb.Query(queryInput)
if err != nil {
exitWithError(fmt.Errorf("E1 failed to make Query API call, %v", err))
}
err = dynamodbattribute.UnmarshalListOfMaps(output.Items, &outputData)
if err != nil {
exitWithError(fmt.Errorf("E2 failed to unmarshal Query result items, %v", err))
}
mtx.Lock()
dataOut = append(dataOut, outputData[0])
mtx.Unlock()
}(i)
}
wg.Wait()
return dataOut
}
这里的想法是 运行 多个 DynamoDB 并行查询,因为查询是 运行 通过 GSI 进行的。截至目前 BatchGetItems doesn't support querying over Indexes,推荐的方法是并行查询数据。我正在使用带有 wg 的 go routines 来并行处理例程的执行。
函数的输入是一个带有 ID 的字符串数组,输出是 ID 的属性。
当函数在本地运行时,没有问题,但是当函数在AWS-Lambda上运行时,返回的数据不断增长;
即;输入 2 项应输出 2 项。如果函数在 AWS-Lambda 上测试,
- 第一次函数returns 2 项
- 第二次 returns 4 项(相同的项重复 2 次)
- 第三次 returns 6 项(相同的项重复 4 次)
等等。这是代码片段。每次 lambda 为 运行 时,是否有未正确处理的问题导致 lambda 输出额外的数据集?
package main
import (
"context"
"fmt"
"os"
"sync"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
)
//Final Output Interface
var bulkOutput []interface{}
func exitWithError(err error) {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
//LambdaInputJSON input for the lambda handler
type LambdaInputJSON struct {
Ids []string `json:"ids,omitempty"`
}
//HandleRequest : Lambda entry point
func HandleRequest(ctx context.Context, data LambdaInputJSON) ([]interface{}, error) {
return DynamoDBBatchGetRecords(data), nil
}
func main() {
lambda.Start(HandleRequest)
}
func DynamoDBBatchGetRecords(a LambdaInputJSON) []interface{} {
var wg sync.WaitGroup
var mutex = &sync.Mutex{}
iterations := len(a.Ids)
wg.Add(iterations)
for i := 0; i < iterations; i++ {
go QueryOutput(a.Ids[i], &wg, mutex)
}
wg.Wait()
return bulkOutput
}
//QueryOutput GoRoutine
func QueryOutput(data string, wg *sync.WaitGroup, mtx *sync.Mutex) {
var outputData []interface{}
defer wg.Done()
sess, err := session.NewSession(&aws.Config{
Region: aws.String("aws-region"),
})
if err != nil {
exitWithError(fmt.Errorf("failed to make Query API call, %v", err))
}
ddb := dynamodb.New(sess)
queryInput := &dynamodb.QueryInput{
Limit: aws.Int64(1),
TableName: aws.String("table-name"),
IndexName: aws.String("gsi-index"),
ScanIndexForward: aws.Bool(false),
ConsistentRead: aws.Bool(false),
KeyConditions: map[string]*dynamodb.Condition{
"column_name": {
ComparisonOperator: aws.String("EQ"),
AttributeValueList: []*dynamodb.AttributeValue{
{
S: aws.String(data),
},
},
},
},
}
output, err := ddb.Query(queryInput)
if err != nil {
exitWithError(fmt.Errorf("Failed to make Query API call, %v", err))
}
err = dynamodbattribute.UnmarshalListOfMaps(output.Items, &outputData)
if err != nil {
exitWithError(fmt.Errorf("Failed to unmarshal Query result items, %v", err))
}
mtx.Lock()
bulkOutput = append(bulkOutput, outputData)
mtx.Unlock()
}
根据 documentation,全局变量独立于您的 Lambda 函数的处理程序代码。这导致缓冲区随着时间的推移而增加。
修正后的参考粘贴在下面。
package main
import (
"context"
"fmt"
"os"
"sync"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
)
func exitWithError(err error) {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
//HandleRequest : Lambda entry point
func HandleRequest(ctx context.Context, data LambdaInputJSON) ([]interface{}, error) {
output := DynamoDBBatchGetRecords(data)
return output, nil
}
func main() {
lambda.Start(HandleRequest)
}
func DynamoDBBatchGetRecords(a LambdaInputJSON) []interface{} {
var dataOut []interface{}
var wg = &sync.WaitGroup{}
var mtx = &sync.Mutex{}
iterations := len(a.Ids)
wg.Add(iterations)
for i := 0; i < i; i++ {
go func(i int) {
defer wg.Done()
var outputData []interface{}
sess, err := session.NewSession(&aws.Config{
Region: aws.String("aws-region"),
})
if err != nil {
exitWithError(fmt.Errorf("failed to make Query API call, %v", err))
}
ddb := dynamodb.New(sess)
queryInput := &dynamodb.QueryInput{
Limit: aws.Int64(1),
TableName: aws.String("table"),
IndexName: aws.String("index"),
ScanIndexForward: aws.Bool(false),
ConsistentRead: aws.Bool(false),
KeyConditions: map[string]*dynamodb.Condition{
"index-column": {
ComparisonOperator: aws.String("EQ"),
AttributeValueList: []*dynamodb.AttributeValue{
{
S: aws.String(a.Ids[i]),
},
},
},
},
}
output, err := ddb.Query(queryInput)
if err != nil {
exitWithError(fmt.Errorf("E1 failed to make Query API call, %v", err))
}
err = dynamodbattribute.UnmarshalListOfMaps(output.Items, &outputData)
if err != nil {
exitWithError(fmt.Errorf("E2 failed to unmarshal Query result items, %v", err))
}
mtx.Lock()
dataOut = append(dataOut, outputData[0])
mtx.Unlock()
}(i)
}
wg.Wait()
return dataOut
}