模拟 AWS 服务和 Lambda 最佳实践
Mocking AWS services and Lambda best practices
我正在研究一个简单的 AWS lambda 函数,该函数由 DynamoDB Streams 事件触发,应该将 REMOVE
事件以外的所有记录转发到 SQS 队列。该功能按预期工作,没有惊喜。
我想编写一个单元测试来测试在 DELETE
事件时不向 SQS 提交任何内容的行为。我首先尝试使用 aws-sdk-mock。正如您在函数代码中看到的那样,我尝试通过在处理程序代码之外初始化 SQS 客户端来遵守 lambda 最佳实践。显然,这会阻止 aws-sdk-mock 模拟 SQS 服务(GitHub 上有一个关于此的问题:https://github.com/dwyl/aws-sdk-mock/issues/206)。
然后我尝试使用 jest 模拟 SQS,这需要更多代码才能正确完成,但我最终遇到了同样的问题,需要将 SQS 的初始化放在里面违反 lambda 最佳实践的处理函数。
如何在让 SQS 客户端 (const sqs: SQS = new SQS()
) 的初始化在处理程序之外 的同时为此函数 编写单元测试?我是在以错误的方式模拟服务,还是更改了处理程序的结构以使其更易于测试?
我知道这个 lambda 函数非常简单,可能不需要单元测试,但我将不得不编写更多具有更复杂逻辑的 lambda,我认为这个非常适合演示问题。
index.ts
import {DynamoDBStreamEvent, DynamoDBStreamHandler} from "aws-lambda";
import SQS = require("aws-sdk/clients/sqs");
import DynamoDB = require("aws-sdk/clients/dynamodb");
const sqs: SQS = new SQS()
export const handleDynamoDbEvent: DynamoDBStreamHandler = async (event: DynamoDBStreamEvent, context, callback) => {
const QUEUE_URL = process.env.TARGET_QUEUE_URL
if (QUEUE_URL.length == 0) {
throw new Error('TARGET_QUEUE_URL not set or empty')
}
await Promise.all(
event.Records
.filter(_ => _.eventName !== "REMOVE")
.map((record) => {
const unmarshalled = DynamoDB.Converter.unmarshall(record.dynamodb.NewImage);
let request: SQS.SendMessageRequest = {
MessageAttributes: {
"EVENT_NAME": {
DataType: "String",
StringValue: record.eventName
}
},
MessageBody: JSON.stringify(unmarshalled),
QueueUrl: QUEUE_URL,
}
return sqs.sendMessage(request).promise()
})
);
}
index.spec.ts
import {DynamoDBRecord, DynamoDBStreamEvent, StreamRecord} from "aws-lambda";
import {AttributeValue} from "aws-lambda/trigger/dynamodb-stream";
import {handleDynamoDbEvent} from "./index";
import {AWSError} from "aws-sdk/lib/error";
import {PromiseResult, Request} from "aws-sdk/lib/request";
import * as SQS from "aws-sdk/clients/sqs";
import {mocked} from "ts-jest/utils";
import DynamoDB = require("aws-sdk/clients/dynamodb");
jest.mock('aws-sdk/clients/sqs', () => {
return jest.fn().mockImplementation(() => {
return {
sendMessage: (params: SQS.Types.SendMessageRequest, callback?: (err: AWSError, data: SQS.Types.SendMessageResult) => void): Request<SQS.Types.SendMessageResult, AWSError> => {
// @ts-ignore
const Mock = jest.fn<Request<SQS.Types.SendMessageResult, AWSError>>(()=>{
return {
promise: (): Promise<PromiseResult<SQS.Types.SendMessageResult, AWSError>> => {
return new Promise<PromiseResult<SQS.SendMessageResult, AWSError>>(resolve => {
resolve(null)
})
}
}
})
return new Mock()
}
}
})
});
describe.only('Handler test', () => {
const mockedSqs = mocked(SQS, true)
process.env.TARGET_QUEUE_URL = 'test'
const OLD_ENV = process.env;
beforeEach(() => {
mockedSqs.mockClear()
jest.resetModules();
process.env = {...OLD_ENV};
});
it('should write INSERT events to SQS', async () => {
console.log('Starting test')
await handleDynamoDbEvent(createEvent(), null, null)
expect(mockedSqs).toHaveBeenCalledTimes(1)
});
})
我将如何处理这个问题的粗略想法:
- 我不会在主函数中执行实际的 SQS sending/manipulation,而是会为消息客户端创建一个接口。像这样:
interface QueueClient {
send(eventName: string, body: string): Promise<any>;
}
- 并创建一个实际的 class 来实现该接口以与 SQS 进行交互:
class SQSQueueClient implements QueueClient {
queueUrl: string
sqs: SQS
constructor() {
this.queueUrl = process.env.TARGET_QUEUE_URL;
if (this.queueUrl.length == 0) {
throw new Error('TARGET_QUEUE_URL not set or empty')
}
this.sqs = new SQS();
}
send(eventName: string, body: string): Promise<any> {
let request: SQS.SendMessageRequest = {
MessageAttributes: {
"EVENT_NAME": {
DataType: "String",
StringValue: eventName
}
},
MessageBody: body,
QueueUrl: this.queueUrl,
}
return this.sqs.sendMessage()
}
}
这 class 了解如何将数据转换为 SQS 格式的详细信息
- 然后我将主要功能分成2个。入口点只是解析队列url,创建一个sqs队列客户端的实际实例并调用
process()
。主要逻辑在process()
const queueClient = new SQSQueueClient();
export const handleDynamoDbEvent: DynamoDBStreamHandler = async (event: DynamoDBStreamEvent, context, callback) => {
return process(queueClient, event);
}
export const process = async (queueClient: QueueClient, event: DynamoDBStreamEvent) => {
return await Promise.all(
event.Records
.filter(_ => _.eventName !== "REMOVE")
.map((record) => {
const unmarshalled = DynamoDB.Converter.unmarshall(record.dynamodb.NewImage);
return queueClient.send(record.eventName, JSON.stringify(unmarshalled));
})
);
}
- 现在可以更轻松地测试
process()
中的主要逻辑。您可以通过手写或使用您喜欢的任何模拟框架来提供实现接口 QueueClient
的模拟实例
- 对于
SQSQueueClient
class,单元测试的好处不大,所以我会更多地依赖集成测试(例如使用localstack之类的东西)
我现在没有实际的 IDE 所以如果这里和那里有语法错误请原谅我
我添加了一个从处理程序函数内部调用的初始化方法。如果之前已经调用它,它会立即 returns,否则将初始化 SQS 客户端。它可以轻松扩展以初始化其他客户端。
这符合 lambda 最佳实践并使测试代码有效。
let sqs: SQS = null
let initialized = false
export const handleDynamoDbEvent: DynamoDBStreamHandler = async (event: DynamoDBStreamEvent, context, callback) => {
init()
const QUEUE_URL = process.env.TARGET_QUEUE_URL
if (QUEUE_URL.length == 0) {
throw new Error('TARGET_QUEUE_URL not set or empty')
}
await Promise.all(
event.Records
.filter(_ => _.eventName !== "REMOVE")
.map((record) => {
const unmarshalled = DynamoDB.Converter.unmarshall(record.dynamodb.NewImage);
let request: SQS.SendMessageRequest = {
MessageAttributes: {
"EVENT_NAME": {
DataType: "String",
StringValue: record.eventName
}
},
MessageBody: JSON.stringify(unmarshalled),
QueueUrl: QUEUE_URL,
}
return sqs.sendMessage(request).promise()
})
);
}
function init() {
if (initialized) {
return
}
console.log('Initializing...')
initialized = true
sqs = new SQS()
}
我正在研究一个简单的 AWS lambda 函数,该函数由 DynamoDB Streams 事件触发,应该将 REMOVE
事件以外的所有记录转发到 SQS 队列。该功能按预期工作,没有惊喜。
我想编写一个单元测试来测试在 DELETE
事件时不向 SQS 提交任何内容的行为。我首先尝试使用 aws-sdk-mock。正如您在函数代码中看到的那样,我尝试通过在处理程序代码之外初始化 SQS 客户端来遵守 lambda 最佳实践。显然,这会阻止 aws-sdk-mock 模拟 SQS 服务(GitHub 上有一个关于此的问题:https://github.com/dwyl/aws-sdk-mock/issues/206)。
然后我尝试使用 jest 模拟 SQS,这需要更多代码才能正确完成,但我最终遇到了同样的问题,需要将 SQS 的初始化放在里面违反 lambda 最佳实践的处理函数。
如何在让 SQS 客户端 (const sqs: SQS = new SQS()
) 的初始化在处理程序之外 的同时为此函数 编写单元测试?我是在以错误的方式模拟服务,还是更改了处理程序的结构以使其更易于测试?
我知道这个 lambda 函数非常简单,可能不需要单元测试,但我将不得不编写更多具有更复杂逻辑的 lambda,我认为这个非常适合演示问题。
index.ts
import {DynamoDBStreamEvent, DynamoDBStreamHandler} from "aws-lambda";
import SQS = require("aws-sdk/clients/sqs");
import DynamoDB = require("aws-sdk/clients/dynamodb");
const sqs: SQS = new SQS()
export const handleDynamoDbEvent: DynamoDBStreamHandler = async (event: DynamoDBStreamEvent, context, callback) => {
const QUEUE_URL = process.env.TARGET_QUEUE_URL
if (QUEUE_URL.length == 0) {
throw new Error('TARGET_QUEUE_URL not set or empty')
}
await Promise.all(
event.Records
.filter(_ => _.eventName !== "REMOVE")
.map((record) => {
const unmarshalled = DynamoDB.Converter.unmarshall(record.dynamodb.NewImage);
let request: SQS.SendMessageRequest = {
MessageAttributes: {
"EVENT_NAME": {
DataType: "String",
StringValue: record.eventName
}
},
MessageBody: JSON.stringify(unmarshalled),
QueueUrl: QUEUE_URL,
}
return sqs.sendMessage(request).promise()
})
);
}
index.spec.ts
import {DynamoDBRecord, DynamoDBStreamEvent, StreamRecord} from "aws-lambda";
import {AttributeValue} from "aws-lambda/trigger/dynamodb-stream";
import {handleDynamoDbEvent} from "./index";
import {AWSError} from "aws-sdk/lib/error";
import {PromiseResult, Request} from "aws-sdk/lib/request";
import * as SQS from "aws-sdk/clients/sqs";
import {mocked} from "ts-jest/utils";
import DynamoDB = require("aws-sdk/clients/dynamodb");
jest.mock('aws-sdk/clients/sqs', () => {
return jest.fn().mockImplementation(() => {
return {
sendMessage: (params: SQS.Types.SendMessageRequest, callback?: (err: AWSError, data: SQS.Types.SendMessageResult) => void): Request<SQS.Types.SendMessageResult, AWSError> => {
// @ts-ignore
const Mock = jest.fn<Request<SQS.Types.SendMessageResult, AWSError>>(()=>{
return {
promise: (): Promise<PromiseResult<SQS.Types.SendMessageResult, AWSError>> => {
return new Promise<PromiseResult<SQS.SendMessageResult, AWSError>>(resolve => {
resolve(null)
})
}
}
})
return new Mock()
}
}
})
});
describe.only('Handler test', () => {
const mockedSqs = mocked(SQS, true)
process.env.TARGET_QUEUE_URL = 'test'
const OLD_ENV = process.env;
beforeEach(() => {
mockedSqs.mockClear()
jest.resetModules();
process.env = {...OLD_ENV};
});
it('should write INSERT events to SQS', async () => {
console.log('Starting test')
await handleDynamoDbEvent(createEvent(), null, null)
expect(mockedSqs).toHaveBeenCalledTimes(1)
});
})
我将如何处理这个问题的粗略想法:
- 我不会在主函数中执行实际的 SQS sending/manipulation,而是会为消息客户端创建一个接口。像这样:
interface QueueClient {
send(eventName: string, body: string): Promise<any>;
}
- 并创建一个实际的 class 来实现该接口以与 SQS 进行交互:
class SQSQueueClient implements QueueClient {
queueUrl: string
sqs: SQS
constructor() {
this.queueUrl = process.env.TARGET_QUEUE_URL;
if (this.queueUrl.length == 0) {
throw new Error('TARGET_QUEUE_URL not set or empty')
}
this.sqs = new SQS();
}
send(eventName: string, body: string): Promise<any> {
let request: SQS.SendMessageRequest = {
MessageAttributes: {
"EVENT_NAME": {
DataType: "String",
StringValue: eventName
}
},
MessageBody: body,
QueueUrl: this.queueUrl,
}
return this.sqs.sendMessage()
}
}
这 class 了解如何将数据转换为 SQS 格式的详细信息
- 然后我将主要功能分成2个。入口点只是解析队列url,创建一个sqs队列客户端的实际实例并调用
process()
。主要逻辑在process()
const queueClient = new SQSQueueClient();
export const handleDynamoDbEvent: DynamoDBStreamHandler = async (event: DynamoDBStreamEvent, context, callback) => {
return process(queueClient, event);
}
export const process = async (queueClient: QueueClient, event: DynamoDBStreamEvent) => {
return await Promise.all(
event.Records
.filter(_ => _.eventName !== "REMOVE")
.map((record) => {
const unmarshalled = DynamoDB.Converter.unmarshall(record.dynamodb.NewImage);
return queueClient.send(record.eventName, JSON.stringify(unmarshalled));
})
);
}
- 现在可以更轻松地测试
process()
中的主要逻辑。您可以通过手写或使用您喜欢的任何模拟框架来提供实现接口QueueClient
的模拟实例 - 对于
SQSQueueClient
class,单元测试的好处不大,所以我会更多地依赖集成测试(例如使用localstack之类的东西)
我现在没有实际的 IDE 所以如果这里和那里有语法错误请原谅我
我添加了一个从处理程序函数内部调用的初始化方法。如果之前已经调用它,它会立即 returns,否则将初始化 SQS 客户端。它可以轻松扩展以初始化其他客户端。
这符合 lambda 最佳实践并使测试代码有效。
let sqs: SQS = null
let initialized = false
export const handleDynamoDbEvent: DynamoDBStreamHandler = async (event: DynamoDBStreamEvent, context, callback) => {
init()
const QUEUE_URL = process.env.TARGET_QUEUE_URL
if (QUEUE_URL.length == 0) {
throw new Error('TARGET_QUEUE_URL not set or empty')
}
await Promise.all(
event.Records
.filter(_ => _.eventName !== "REMOVE")
.map((record) => {
const unmarshalled = DynamoDB.Converter.unmarshall(record.dynamodb.NewImage);
let request: SQS.SendMessageRequest = {
MessageAttributes: {
"EVENT_NAME": {
DataType: "String",
StringValue: record.eventName
}
},
MessageBody: JSON.stringify(unmarshalled),
QueueUrl: QUEUE_URL,
}
return sqs.sendMessage(request).promise()
})
);
}
function init() {
if (initialized) {
return
}
console.log('Initializing...')
initialized = true
sqs = new SQS()
}