我如何(集成)使用 RabbitMQ 测试 @StreamListener?
How do I (integration) test @StreamListener with RabbitMQ?
我正在尝试为我的侦听器编写一个集成测试 spring 启动应用程序,在测试中将启动该应用程序,然后创建一条消息,将其放入输出通道,等待消息被挑选和加工。
这是我要测试的内容:
package com.example;
import com.netflix.discovery.converters.Auto;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import org.springframework.web.context.WebApplicationContext;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.integration.support.management.graph.LinkNode.Type.output;
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("scratch")
public class DemoApplicationTests {
@Component
@EnableBinding(Source.class)
static class TestMessageSource {
@Autowired
private Source source;
public void sendMessage(String message) {
new Thread(() -> {
Message<Greeting> msg = MessageBuilder.withPayload(new Greeting(message)).build();
source.output().send(msg);
}).start();
}
}
static class CounterFakeService implements CounterService {
public int count = 0;
public Greeting greeting;
@Override
public void recordCount(Greeting greeting) {
count++;
this.greeting = greeting;
}
}
private CounterFakeService fakeCounterService;
@Bean
CounterService counterService() {
return fakeCounterService;
}
@Autowired
TestMessageSource messageSource;
@Before
public void before() {
fakeCounterService = new CounterFakeService();
// this.mvc = MockMvcBuilders.webAppContextSetup(this.context).build();
}
@Test
public void doesProcessMessages() throws InterruptedException {
assertThat(fakeCounterService.count).isEqualTo(0);
messageSource.sendMessage("test");
Thread.sleep(5000);
assertThat(fakeCounterService.count).isEqualTo(1);
assertThat(fakeCounterService.greeting.getMessage()).isEqualTo("test");
}
}
并且在application-scratch.properties
中我有输出通道绑定到输入交换
spring.rabbitmq.host=rabbitmq.local.pcfdev.io
spring.rabbitmq.port=5672
spring.rabbitmq.password=i9jbk2o3ingqtkrekgm988bvui
spring.rabbitmq.username=8cf073b0-a2ff-450a-bee5-3954cb6c191f
spring.rabbitmq.virtual-host=5fc33451-4ec0-440e-90a1-6e7ed0c025f9
spring.cloud.stream.bindings.output.destination=input
但是,目前测试失败,因为没有收到任何消息。我应该寻找什么?
编辑 - 完整性的侦听器代码
PS:我已经验证这实际上与另一个应用程序一起工作,向 input
交换发送消息。只是无法让测试工作:(
@RestController
@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding(Sink.class)
public class DemoApplication {
private static Logger log = LoggerFactory.getLogger(DemoApplication.class);
@Autowired
private Config config;
@Autowired
CounterServiceImpl counterService;
@StreamListener(Sink.INPUT)
public void handle(Greeting greeting) {
log.info("in handle(Greeting), {}", greeting);
counterService.recordCount(greeting);
}
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
我没有看到 CounterService
入站通道的任何 @StreamListener 配置。您还需要相同目的地的入站通道的绑定配置 input
.
捂脸时间! :)
所以我在后台有一个实际的监听器 运行,它正在接收所有测试从未通过的消息
我正在尝试为我的侦听器编写一个集成测试 spring 启动应用程序,在测试中将启动该应用程序,然后创建一条消息,将其放入输出通道,等待消息被挑选和加工。
这是我要测试的内容:
package com.example;
import com.netflix.discovery.converters.Auto;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import org.springframework.web.context.WebApplicationContext;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.integration.support.management.graph.LinkNode.Type.output;
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("scratch")
public class DemoApplicationTests {
@Component
@EnableBinding(Source.class)
static class TestMessageSource {
@Autowired
private Source source;
public void sendMessage(String message) {
new Thread(() -> {
Message<Greeting> msg = MessageBuilder.withPayload(new Greeting(message)).build();
source.output().send(msg);
}).start();
}
}
static class CounterFakeService implements CounterService {
public int count = 0;
public Greeting greeting;
@Override
public void recordCount(Greeting greeting) {
count++;
this.greeting = greeting;
}
}
private CounterFakeService fakeCounterService;
@Bean
CounterService counterService() {
return fakeCounterService;
}
@Autowired
TestMessageSource messageSource;
@Before
public void before() {
fakeCounterService = new CounterFakeService();
// this.mvc = MockMvcBuilders.webAppContextSetup(this.context).build();
}
@Test
public void doesProcessMessages() throws InterruptedException {
assertThat(fakeCounterService.count).isEqualTo(0);
messageSource.sendMessage("test");
Thread.sleep(5000);
assertThat(fakeCounterService.count).isEqualTo(1);
assertThat(fakeCounterService.greeting.getMessage()).isEqualTo("test");
}
}
并且在application-scratch.properties
中我有输出通道绑定到输入交换
spring.rabbitmq.host=rabbitmq.local.pcfdev.io
spring.rabbitmq.port=5672
spring.rabbitmq.password=i9jbk2o3ingqtkrekgm988bvui
spring.rabbitmq.username=8cf073b0-a2ff-450a-bee5-3954cb6c191f
spring.rabbitmq.virtual-host=5fc33451-4ec0-440e-90a1-6e7ed0c025f9
spring.cloud.stream.bindings.output.destination=input
但是,目前测试失败,因为没有收到任何消息。我应该寻找什么?
编辑 - 完整性的侦听器代码
PS:我已经验证这实际上与另一个应用程序一起工作,向 input
交换发送消息。只是无法让测试工作:(
@RestController
@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding(Sink.class)
public class DemoApplication {
private static Logger log = LoggerFactory.getLogger(DemoApplication.class);
@Autowired
private Config config;
@Autowired
CounterServiceImpl counterService;
@StreamListener(Sink.INPUT)
public void handle(Greeting greeting) {
log.info("in handle(Greeting), {}", greeting);
counterService.recordCount(greeting);
}
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
我没有看到 CounterService
入站通道的任何 @StreamListener 配置。您还需要相同目的地的入站通道的绑定配置 input
.
捂脸时间! :) 所以我在后台有一个实际的监听器 运行,它正在接收所有测试从未通过的消息