Spring Cloud Stream (Hoxton) Kafka 生产者/消费者未在与 EmbeddedKafka 的集成测试中工作
Spring Cloud Stream (Hoxton) Kafka Producer / Consumer not working in integration tests with EmbeddedKafka
我有一个工作应用程序,它使用 Hoxton 随附的 Producers 的最新更新。现在我正在尝试添加一些集成测试,断言生产者实际上正在按预期生成消息。问题是,我在测试中使用的消费者从不阅读主题中的任何内容。
为了使这个问题可以重现,我从 spring 云流示例中重新使用了一个项目 (spring-cloud-stream-samples/source-samples/dynamic-destination-source-kafka
),并按如下方式对其进行了调整:
DynamicDestinationSourceApplication(EmitterProcessor 现在是一个 bean)
@SpringBootApplication
@RestController
public class DynamicDestinationSourceApplication {
@Autowired
private ObjectMapper jsonMapper;
@Autowired
private EmitterProcessor<Message<?>> processor;
public static void main(String[] args) {
SpringApplication.run(DynamicDestinationSourceApplication.class, args);
}
@SuppressWarnings("unchecked")
@RequestMapping(path = "/", method = POST, consumes = "*/*")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) throws Exception {
Map<String, String> payload = jsonMapper.readValue(body, Map.class);
String destinationName = payload.get("id");
Message<?> message = MessageBuilder.withPayload(payload)
.setHeader("spring.cloud.stream.sendto.destination", destinationName).build();
processor.onNext(message);
}
@Bean
public Supplier<Flux<Message<?>>> supplier() {
return () -> processor;
}
@Bean
public EmitterProcessor<Message<?>> processor(){
return EmitterProcessor.create();
}
//Following sink is used as test consumer. It logs the data received through the consumer.
static class TestSink {
private final Log logger = LogFactory.getLog(getClass());
@Bean
public Consumer<String> receive1() {
return data -> logger.info("Data received from customer-1..." + data);
}
@Bean
public Consumer<String> receive2() {
return data -> logger.info("Data received from customer-2..." + data);
}
}
}
模块应用程序测试
@EmbeddedKafka
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = DynamicDestinationSourceApplication.class)
@WebAppConfiguration
@DirtiesContext
public class ModuleApplicationTests {
private static String TOPIC = "someTopic";
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
private EmitterProcessor<Message<?>> processor;
@Test
public void shouldProduceAndConsume() {
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer()).createConsumer();
consumer.subscribe(Collections.singleton(TOPIC));
consumer.poll(0);
Message<?> message = MessageBuilder.withPayload(new HashMap<String,String>(){{put("somekey", "somevalue");}})
.setHeader("spring.cloud.stream.sendto.destination", TOPIC).build();
processor.onNext(message);
ConsumerRecord<String, String> someRecord = KafkaTestUtils.getSingleRecord(consumer, TOPIC);
System.out.println(someRecord);
}
}
以No records found for topic
结尾。为什么这在测试期间不起作用?
更新:
我的实际项目与上面的项目不完全一样,我看到 emitterProcessor.onNext()
并没有最终调用 AbstractMessageHandler.onNext()
调试 emitterProcessor.onNext()
我看到它调用 drain()
并且在 FluxPublish.PubSubInner<T>[] a = subscribers;
订阅者是一个空数组,而在正常的应用程序执行中它包含一个 EmitterProcessor。
我将 testImplementation("org.springframework.cloud:spring-cloud-stream-test-support")
错误地添加为依赖项。这使用了不打算与集成测试一起使用的测试绑定器。
我有一个工作应用程序,它使用 Hoxton 随附的 Producers 的最新更新。现在我正在尝试添加一些集成测试,断言生产者实际上正在按预期生成消息。问题是,我在测试中使用的消费者从不阅读主题中的任何内容。
为了使这个问题可以重现,我从 spring 云流示例中重新使用了一个项目 (spring-cloud-stream-samples/source-samples/dynamic-destination-source-kafka
),并按如下方式对其进行了调整:
DynamicDestinationSourceApplication(EmitterProcessor 现在是一个 bean)
@SpringBootApplication
@RestController
public class DynamicDestinationSourceApplication {
@Autowired
private ObjectMapper jsonMapper;
@Autowired
private EmitterProcessor<Message<?>> processor;
public static void main(String[] args) {
SpringApplication.run(DynamicDestinationSourceApplication.class, args);
}
@SuppressWarnings("unchecked")
@RequestMapping(path = "/", method = POST, consumes = "*/*")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) throws Exception {
Map<String, String> payload = jsonMapper.readValue(body, Map.class);
String destinationName = payload.get("id");
Message<?> message = MessageBuilder.withPayload(payload)
.setHeader("spring.cloud.stream.sendto.destination", destinationName).build();
processor.onNext(message);
}
@Bean
public Supplier<Flux<Message<?>>> supplier() {
return () -> processor;
}
@Bean
public EmitterProcessor<Message<?>> processor(){
return EmitterProcessor.create();
}
//Following sink is used as test consumer. It logs the data received through the consumer.
static class TestSink {
private final Log logger = LogFactory.getLog(getClass());
@Bean
public Consumer<String> receive1() {
return data -> logger.info("Data received from customer-1..." + data);
}
@Bean
public Consumer<String> receive2() {
return data -> logger.info("Data received from customer-2..." + data);
}
}
}
模块应用程序测试
@EmbeddedKafka
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = DynamicDestinationSourceApplication.class)
@WebAppConfiguration
@DirtiesContext
public class ModuleApplicationTests {
private static String TOPIC = "someTopic";
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
private EmitterProcessor<Message<?>> processor;
@Test
public void shouldProduceAndConsume() {
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer()).createConsumer();
consumer.subscribe(Collections.singleton(TOPIC));
consumer.poll(0);
Message<?> message = MessageBuilder.withPayload(new HashMap<String,String>(){{put("somekey", "somevalue");}})
.setHeader("spring.cloud.stream.sendto.destination", TOPIC).build();
processor.onNext(message);
ConsumerRecord<String, String> someRecord = KafkaTestUtils.getSingleRecord(consumer, TOPIC);
System.out.println(someRecord);
}
}
以No records found for topic
结尾。为什么这在测试期间不起作用?
更新:
我的实际项目与上面的项目不完全一样,我看到 emitterProcessor.onNext()
并没有最终调用 AbstractMessageHandler.onNext()
调试 emitterProcessor.onNext()
我看到它调用 drain()
并且在 FluxPublish.PubSubInner<T>[] a = subscribers;
订阅者是一个空数组,而在正常的应用程序执行中它包含一个 EmitterProcessor。
我将 testImplementation("org.springframework.cloud:spring-cloud-stream-test-support")
错误地添加为依赖项。这使用了不打算与集成测试一起使用的测试绑定器。