发布测试 spring 云 SQS 侦听器
Issue testing spring cloud SQS Listener
环境
- Spring 开机:1.5.13.RELEASE
- 云:Edgware.SR3
- 云 AWS:1.2.2.RELEASE
- Java 8
- OSX 10.13.4
问题
我正在尝试为 SQS 编写集成测试。
我在 TCP/4576
上有一个带有 SQS 运行 的本地 运行 localstack docker 容器
在我的测试代码中,我定义了一个端点设置为本地 4576 的 SQS 客户端,并且可以成功连接并创建队列、发送消息和删除队列。我还可以使用 SQS 客户端接收消息并获取我发送的消息。
我的问题是,如果我删除手动接收消息的代码以允许另一个组件接收消息,似乎什么也没有发生。我有一个 spring 组件,注释如下:
听众
@Component
public class MyListener {
@SqsListener(value = "my_queue", deletionPolicy = ON_SUCCESS)
public void receive(final MyMsg msg) {
System.out.println("GOT THE MESSAGE: "+ msg.toString());
}
}
测试
@RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.profiles.active=test")
public class MyTest {
@Autowired
private AmazonSQSAsync amazonSQS;
@Autowired
private SimpleMessageListenerContainer container;
private String queueUrl;
@Before
public void setUp() {
queueUrl = amazonSQS.createQueue("my_queue").getQueueUrl();
}
@After
public void tearDown() {
amazonSQS.deleteQueue(queueUrl);
}
@Test
public void name() throws InterruptedException {
amazonSQS.sendMessage(new SendMessageRequest(queueUrl, "hello"));
System.out.println("isRunning:" + container.isRunning());
System.out.println("isActive:" + container.isActive());
System.out.println("isRunningOnQueue:" + container.isRunning("my_queue"));
Thread.sleep(30_000);
System.out.println("GOT MESSAGE: " + amazonSQS.receiveMessage(queueUrl).getMessages().size());
}
@TestConfiguration
@EnableSqs
public static class SQSConfiguration {
@Primary
@Bean(destroyMethod = "shutdown")
public AmazonSQSAsync amazonSQS() {
final AwsClientBuilder.EndpointConfiguration endpoint = new AwsClientBuilder.EndpointConfiguration("http://127.0.0.1:4576", "eu-west-1");
return new AmazonSQSBufferedAsyncClient(AmazonSQSAsyncClientBuilder
.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("key", "secret")))
.withEndpointConfiguration(endpoint)
.build());
}
}
}
在测试日志中我看到:
o.s.c.a.m.listener.QueueMessageHandler : 1 message handler methods found on class MyListener: {public void MyListener.receive(MyMsg)=org.springframework.cloud.aws.messaging.listener.QueueMessageHandler$MappingInformation@1cd4082a}
2018-05-31 22:50:39.582 INFO 16329 ---
o.s.c.a.m.listener.QueueMessageHandler : Mapped "org.springframework.cloud.aws.messaging.listener.QueueMessageHandler$MappingInformation@1cd4082a" onto public void MyListener.receive(MyMsg)
其次是:
isRunning:true
isActive:true
isRunningOnQueue:false
GOT MESSAGE: 1
这表明在发送消息之间的 30 秒暂停中,容器没有接收它,而当我手动轮询消息时,它在队列中并且我可以使用它。
我的问题是,为什么没有调用侦听器,为什么 isRunningOnQueue:false
行表明它没有为该队列自动启动?
请注意,我还尝试将自己的 SimpleMessageListenerContainer
bean 设置为 autostart 显式设置为 true(无论如何都是默认值)并且没有观察到行为发生变化。我认为由 @EnableSqs
设置的 org.springframework.cloud.aws.messaging.config.annotation.SqsConfiguration#simpleMessageListenerContainer
应该配置一个自动启动的 SimpleMessageListenerContainer
应该轮询我的消息。
我也设置了
logging.level.org.apache.http=DEBUG
logging.level.org.springframework.cloud=DEBUG
在我的测试属性中,可以看到 HTTP 调用创建队列、发送消息和删除等,但没有要接收的 HTTP 调用(测试结束时我的手动调用除外)。
经过一番修改后我弄明白了。
即使简单的消息容器工厂设置为不自动启动,它似乎还是会进行初始化,这涉及到判断队列是否存在。
在这种情况下,队列是在我的设置方法测试中创建的 - 但遗憾的是,这是在设置 spring 上下文之后,这意味着发生了异常。
我通过简单地将队列创建移动到 SQS 客户端的上下文创建(这发生在创建消息容器之前)来解决这个问题。即:
@Bean(destroyMethod = "shutdown")
public AmazonSQSAsync amazonSQS() {
final AwsClientBuilder.EndpointConfiguration endpoint = new AwsClientBuilder.EndpointConfiguration("http://localhost:4576", "eu-west-1");
final AmazonSQSBufferedAsyncClient client = new AmazonSQSBufferedAsyncClient(AmazonSQSAsyncClientBuilder
.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummyKey", "dummySecret")))
.withEndpointConfiguration(endpoint)
.build());
client.createQueue("test-queue");
return client;
}
环境
- Spring 开机:1.5.13.RELEASE
- 云:Edgware.SR3
- 云 AWS:1.2.2.RELEASE
- Java 8
- OSX 10.13.4
问题
我正在尝试为 SQS 编写集成测试。
我在 TCP/4576
在我的测试代码中,我定义了一个端点设置为本地 4576 的 SQS 客户端,并且可以成功连接并创建队列、发送消息和删除队列。我还可以使用 SQS 客户端接收消息并获取我发送的消息。
我的问题是,如果我删除手动接收消息的代码以允许另一个组件接收消息,似乎什么也没有发生。我有一个 spring 组件,注释如下:
听众
@Component
public class MyListener {
@SqsListener(value = "my_queue", deletionPolicy = ON_SUCCESS)
public void receive(final MyMsg msg) {
System.out.println("GOT THE MESSAGE: "+ msg.toString());
}
}
测试
@RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.profiles.active=test")
public class MyTest {
@Autowired
private AmazonSQSAsync amazonSQS;
@Autowired
private SimpleMessageListenerContainer container;
private String queueUrl;
@Before
public void setUp() {
queueUrl = amazonSQS.createQueue("my_queue").getQueueUrl();
}
@After
public void tearDown() {
amazonSQS.deleteQueue(queueUrl);
}
@Test
public void name() throws InterruptedException {
amazonSQS.sendMessage(new SendMessageRequest(queueUrl, "hello"));
System.out.println("isRunning:" + container.isRunning());
System.out.println("isActive:" + container.isActive());
System.out.println("isRunningOnQueue:" + container.isRunning("my_queue"));
Thread.sleep(30_000);
System.out.println("GOT MESSAGE: " + amazonSQS.receiveMessage(queueUrl).getMessages().size());
}
@TestConfiguration
@EnableSqs
public static class SQSConfiguration {
@Primary
@Bean(destroyMethod = "shutdown")
public AmazonSQSAsync amazonSQS() {
final AwsClientBuilder.EndpointConfiguration endpoint = new AwsClientBuilder.EndpointConfiguration("http://127.0.0.1:4576", "eu-west-1");
return new AmazonSQSBufferedAsyncClient(AmazonSQSAsyncClientBuilder
.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("key", "secret")))
.withEndpointConfiguration(endpoint)
.build());
}
}
}
在测试日志中我看到:
o.s.c.a.m.listener.QueueMessageHandler : 1 message handler methods found on class MyListener: {public void MyListener.receive(MyMsg)=org.springframework.cloud.aws.messaging.listener.QueueMessageHandler$MappingInformation@1cd4082a} 2018-05-31 22:50:39.582 INFO 16329 ---
o.s.c.a.m.listener.QueueMessageHandler : Mapped "org.springframework.cloud.aws.messaging.listener.QueueMessageHandler$MappingInformation@1cd4082a" onto public void MyListener.receive(MyMsg)
其次是:
isRunning:true
isActive:true
isRunningOnQueue:false
GOT MESSAGE: 1
这表明在发送消息之间的 30 秒暂停中,容器没有接收它,而当我手动轮询消息时,它在队列中并且我可以使用它。
我的问题是,为什么没有调用侦听器,为什么 isRunningOnQueue:false
行表明它没有为该队列自动启动?
请注意,我还尝试将自己的 SimpleMessageListenerContainer
bean 设置为 autostart 显式设置为 true(无论如何都是默认值)并且没有观察到行为发生变化。我认为由 @EnableSqs
设置的 org.springframework.cloud.aws.messaging.config.annotation.SqsConfiguration#simpleMessageListenerContainer
应该配置一个自动启动的 SimpleMessageListenerContainer
应该轮询我的消息。
我也设置了
logging.level.org.apache.http=DEBUG
logging.level.org.springframework.cloud=DEBUG
在我的测试属性中,可以看到 HTTP 调用创建队列、发送消息和删除等,但没有要接收的 HTTP 调用(测试结束时我的手动调用除外)。
经过一番修改后我弄明白了。
即使简单的消息容器工厂设置为不自动启动,它似乎还是会进行初始化,这涉及到判断队列是否存在。
在这种情况下,队列是在我的设置方法测试中创建的 - 但遗憾的是,这是在设置 spring 上下文之后,这意味着发生了异常。
我通过简单地将队列创建移动到 SQS 客户端的上下文创建(这发生在创建消息容器之前)来解决这个问题。即:
@Bean(destroyMethod = "shutdown")
public AmazonSQSAsync amazonSQS() {
final AwsClientBuilder.EndpointConfiguration endpoint = new AwsClientBuilder.EndpointConfiguration("http://localhost:4576", "eu-west-1");
final AmazonSQSBufferedAsyncClient client = new AmazonSQSBufferedAsyncClient(AmazonSQSAsyncClientBuilder
.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummyKey", "dummySecret")))
.withEndpointConfiguration(endpoint)
.build());
client.createQueue("test-queue");
return client;
}