Kafka Consumer/Producer 在 Spring Kafka 中测试
Kafka Consumer/Producer test in Spring Kafka
我目前正在开发 Kafka 模块,我正在使用 spring-kafka
Kafka 通信的抽象。我能够从实际实施的角度集成生产者和消费者,但是,我不确定如何使用 @KafkaListener
测试(特别是集成测试)消费者周围的业务逻辑。我尝试关注有关该主题的 spring-kafk
文档和各种博客,但其中 none 回答了我的预期问题。
Spring开机测试class
//imports not mentioned due to brevity
@RunWith(SpringRunner.class)
@SpringBootTest(classes = PaymentAccountUpdaterApplication.class,
webEnvironment = SpringBootTest.WebEnvironment.NONE)
public class CardUpdaterMessagingIntegrationTest {
private final static String cardUpdateTopic = "TP.PRF.CARDEVENTS";
@Autowired
private ObjectMapper objectMapper;
@ClassRule
public static KafkaEmbedded kafkaEmbedded =
new KafkaEmbedded(1, false, cardUpdateTopic);
@Test
public void sampleTest() throws Exception {
Map<String, Object> consumerConfig =
KafkaTestUtils.consumerProps("test", "false", kafkaEmbedded);
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
ConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerConfig);
ContainerProperties containerProperties = new ContainerProperties(cardUpdateTopic);
containerProperties.setMessageListener(new SafeStringJsonMessageConverter());
KafkaMessageListenerContainer<String, String>
container = new KafkaMessageListenerContainer<>(cf, containerProperties);
BlockingQueue<ConsumerRecord<String, String>> records = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<String, String>) data -> {
System.out.println("Added to Queue: "+ data);
records.add(data);
});
container.setBeanName("templateTests");
container.start();
ContainerTestUtils.waitForAssignment(container, kafkaEmbedded.getPartitionsPerTopic());
Map<String, Object> producerConfig = KafkaTestUtils.senderProps(kafkaEmbedded.getBrokersAsString());
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerFactory<String, Object> pf =
new DefaultKafkaProducerFactory<>(producerConfig);
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(pf);
String payload = objectMapper.writeValueAsString(accountWrapper());
kafkaTemplate.send(cardUpdateTopic, 0, payload);
ConsumerRecord<String, String> received = records.poll(10, TimeUnit.SECONDS);
assertThat(received).has(partition(0));
}
@After
public void after() {
kafkaEmbedded.after();
}
private AccountWrapper accountWrapper() {
return AccountWrapper.builder()
.eventSource("PROFILE")
.eventName("INITIAL_LOAD_CARD")
.eventTime(LocalDateTime.now().toString())
.eventID("8730c547-02bd-45c0-857b-d90f859e886c")
.details(AccountDetail.builder()
.customerId("idArZ_K2IgE86DcPhv-uZw")
.vaultId("912A60928AD04F69F3877D5B422327EE")
.expiryDate("122019")
.build())
.build();
}
}
监听器Class
@Service
public class ConsumerMessageListener {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerMessageListener.class);
private ConsumerMessageProcessorService consumerMessageProcessorService;
public ConsumerMessageListener(ConsumerMessageProcessorService consumerMessageProcessorService) {
this.consumerMessageProcessorService = consumerMessageProcessorService;
}
@KafkaListener(id = "cardUpdateEventListener",
topics = "${kafka.consumer.cardupdates.topic}",
containerFactory = "kafkaJsonListenerContainerFactory")
public void processIncomingMessage(Payload<AccountWrapper,Object> payloadContainer,
Acknowledgment acknowledgment,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId,
@Header(KafkaHeaders.OFFSET) String offset) {
try {
// business logic to process the message
consumerMessageProcessorService.processIncomingMessage(payloadContainer);
} catch (Exception e) {
LOGGER.error("Unhandled exception in card event message consumer. Discarding offset commit." +
"message:: {}, details:: {}", e.getMessage(), messageMetadataInfo);
throw e;
}
acknowledgment.acknowledge();
}
}
我的问题是:在测试 class 中,我断言从 BlockingQueue
轮询的分区、有效负载等,但是,我的问题是如何验证我的业务逻辑在用 @KafkaListener
注释的 class 正在正确执行,并根据错误处理和其他业务场景将消息路由到不同的主题。在某些示例中,我看到 CountDownLatch
断言我不想将其放入我的业务逻辑中以在生产级代码中断言。消息处理器也是 Async
所以,如何断言执行,不确定。
任何帮助,不胜感激。
is getting executed properly and routing the messages to different topic based on error handling and other business scenarios.
集成测试可以使用该 "different" 主题来断言侦听器已按预期处理它。
您还可以将 BeanPostProcessor
添加到您的测试用例并将 ConsumerMessageListener
bean 包装在代理中以验证输入参数是否符合预期。
编辑
这是一个将侦听器包装在代理中的示例...
@SpringBootApplication
public class So53678801Application {
public static void main(String[] args) {
SpringApplication.run(So53678801Application.class, args);
}
@Bean
public MessageConverter converter() {
return new StringJsonMessageConverter();
}
public static class Foo {
private String bar;
public Foo() {
super();
}
public Foo(String bar) {
this.bar = bar;
}
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
@Component
class Listener {
@KafkaListener(id = "so53678801", topics = "so53678801")
public void processIncomingMessage(Foo payload,
Acknowledgment acknowledgment,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId,
@Header(KafkaHeaders.OFFSET) String offset) {
System.out.println(payload);
// ...
acknowledgment.acknowledge();
}
}
和
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.ack-mode=manual
和
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { So53678801Application.class,
So53678801ApplicationTests.TestConfig.class})
public class So53678801ApplicationTests {
@ClassRule
public static EmbeddedKafkaRule embededKafka = new EmbeddedKafkaRule(1, false, "so53678801");
@BeforeClass
public static void setup() {
System.setProperty("spring.kafka.bootstrap-servers",
embededKafka.getEmbeddedKafka().getBrokersAsString());
}
@Autowired
private KafkaTemplate<String, String> template;
@Autowired
private ListenerWrapper wrapper;
@Test
public void test() throws Exception {
this.template.send("so53678801", "{\"bar\":\"baz\"}");
assertThat(this.wrapper.latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.wrapper.argsReceived[0]).isInstanceOf(Foo.class);
assertThat(((Foo) this.wrapper.argsReceived[0]).getBar()).isEqualTo("baz");
assertThat(this.wrapper.ackCalled).isTrue();
}
@Configuration
public static class TestConfig {
@Bean
public static ListenerWrapper bpp() { // BPPs have to be static
return new ListenerWrapper();
}
}
public static class ListenerWrapper implements BeanPostProcessor, Ordered {
private final CountDownLatch latch = new CountDownLatch(1);
private Object[] argsReceived;
private boolean ackCalled;
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof Listener) {
ProxyFactory pf = new ProxyFactory(bean);
pf.setProxyTargetClass(true); // unless the listener is on an interface
pf.addAdvice(interceptor());
return pf.getProxy();
}
return bean;
}
private MethodInterceptor interceptor() {
return invocation -> {
if (invocation.getMethod().getName().equals("processIncomingMessage")) {
Object[] args = invocation.getArguments();
this.argsReceived = Arrays.copyOf(args, args.length);
Acknowledgment ack = (Acknowledgment) args[1];
args[1] = (Acknowledgment) () -> {
this.ackCalled = true;
ack.acknowledge();
};
try {
return invocation.proceed();
}
finally {
this.latch.countDown();
}
}
else {
return invocation.proceed();
}
};
}
}
}
我目前正在开发 Kafka 模块,我正在使用 spring-kafka
Kafka 通信的抽象。我能够从实际实施的角度集成生产者和消费者,但是,我不确定如何使用 @KafkaListener
测试(特别是集成测试)消费者周围的业务逻辑。我尝试关注有关该主题的 spring-kafk
文档和各种博客,但其中 none 回答了我的预期问题。
Spring开机测试class
//imports not mentioned due to brevity
@RunWith(SpringRunner.class)
@SpringBootTest(classes = PaymentAccountUpdaterApplication.class,
webEnvironment = SpringBootTest.WebEnvironment.NONE)
public class CardUpdaterMessagingIntegrationTest {
private final static String cardUpdateTopic = "TP.PRF.CARDEVENTS";
@Autowired
private ObjectMapper objectMapper;
@ClassRule
public static KafkaEmbedded kafkaEmbedded =
new KafkaEmbedded(1, false, cardUpdateTopic);
@Test
public void sampleTest() throws Exception {
Map<String, Object> consumerConfig =
KafkaTestUtils.consumerProps("test", "false", kafkaEmbedded);
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
ConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerConfig);
ContainerProperties containerProperties = new ContainerProperties(cardUpdateTopic);
containerProperties.setMessageListener(new SafeStringJsonMessageConverter());
KafkaMessageListenerContainer<String, String>
container = new KafkaMessageListenerContainer<>(cf, containerProperties);
BlockingQueue<ConsumerRecord<String, String>> records = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<String, String>) data -> {
System.out.println("Added to Queue: "+ data);
records.add(data);
});
container.setBeanName("templateTests");
container.start();
ContainerTestUtils.waitForAssignment(container, kafkaEmbedded.getPartitionsPerTopic());
Map<String, Object> producerConfig = KafkaTestUtils.senderProps(kafkaEmbedded.getBrokersAsString());
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerFactory<String, Object> pf =
new DefaultKafkaProducerFactory<>(producerConfig);
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(pf);
String payload = objectMapper.writeValueAsString(accountWrapper());
kafkaTemplate.send(cardUpdateTopic, 0, payload);
ConsumerRecord<String, String> received = records.poll(10, TimeUnit.SECONDS);
assertThat(received).has(partition(0));
}
@After
public void after() {
kafkaEmbedded.after();
}
private AccountWrapper accountWrapper() {
return AccountWrapper.builder()
.eventSource("PROFILE")
.eventName("INITIAL_LOAD_CARD")
.eventTime(LocalDateTime.now().toString())
.eventID("8730c547-02bd-45c0-857b-d90f859e886c")
.details(AccountDetail.builder()
.customerId("idArZ_K2IgE86DcPhv-uZw")
.vaultId("912A60928AD04F69F3877D5B422327EE")
.expiryDate("122019")
.build())
.build();
}
}
监听器Class
@Service
public class ConsumerMessageListener {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerMessageListener.class);
private ConsumerMessageProcessorService consumerMessageProcessorService;
public ConsumerMessageListener(ConsumerMessageProcessorService consumerMessageProcessorService) {
this.consumerMessageProcessorService = consumerMessageProcessorService;
}
@KafkaListener(id = "cardUpdateEventListener",
topics = "${kafka.consumer.cardupdates.topic}",
containerFactory = "kafkaJsonListenerContainerFactory")
public void processIncomingMessage(Payload<AccountWrapper,Object> payloadContainer,
Acknowledgment acknowledgment,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId,
@Header(KafkaHeaders.OFFSET) String offset) {
try {
// business logic to process the message
consumerMessageProcessorService.processIncomingMessage(payloadContainer);
} catch (Exception e) {
LOGGER.error("Unhandled exception in card event message consumer. Discarding offset commit." +
"message:: {}, details:: {}", e.getMessage(), messageMetadataInfo);
throw e;
}
acknowledgment.acknowledge();
}
}
我的问题是:在测试 class 中,我断言从 BlockingQueue
轮询的分区、有效负载等,但是,我的问题是如何验证我的业务逻辑在用 @KafkaListener
注释的 class 正在正确执行,并根据错误处理和其他业务场景将消息路由到不同的主题。在某些示例中,我看到 CountDownLatch
断言我不想将其放入我的业务逻辑中以在生产级代码中断言。消息处理器也是 Async
所以,如何断言执行,不确定。
任何帮助,不胜感激。
is getting executed properly and routing the messages to different topic based on error handling and other business scenarios.
集成测试可以使用该 "different" 主题来断言侦听器已按预期处理它。
您还可以将 BeanPostProcessor
添加到您的测试用例并将 ConsumerMessageListener
bean 包装在代理中以验证输入参数是否符合预期。
编辑
这是一个将侦听器包装在代理中的示例...
@SpringBootApplication
public class So53678801Application {
public static void main(String[] args) {
SpringApplication.run(So53678801Application.class, args);
}
@Bean
public MessageConverter converter() {
return new StringJsonMessageConverter();
}
public static class Foo {
private String bar;
public Foo() {
super();
}
public Foo(String bar) {
this.bar = bar;
}
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
@Component
class Listener {
@KafkaListener(id = "so53678801", topics = "so53678801")
public void processIncomingMessage(Foo payload,
Acknowledgment acknowledgment,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId,
@Header(KafkaHeaders.OFFSET) String offset) {
System.out.println(payload);
// ...
acknowledgment.acknowledge();
}
}
和
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.ack-mode=manual
和
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { So53678801Application.class,
So53678801ApplicationTests.TestConfig.class})
public class So53678801ApplicationTests {
@ClassRule
public static EmbeddedKafkaRule embededKafka = new EmbeddedKafkaRule(1, false, "so53678801");
@BeforeClass
public static void setup() {
System.setProperty("spring.kafka.bootstrap-servers",
embededKafka.getEmbeddedKafka().getBrokersAsString());
}
@Autowired
private KafkaTemplate<String, String> template;
@Autowired
private ListenerWrapper wrapper;
@Test
public void test() throws Exception {
this.template.send("so53678801", "{\"bar\":\"baz\"}");
assertThat(this.wrapper.latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.wrapper.argsReceived[0]).isInstanceOf(Foo.class);
assertThat(((Foo) this.wrapper.argsReceived[0]).getBar()).isEqualTo("baz");
assertThat(this.wrapper.ackCalled).isTrue();
}
@Configuration
public static class TestConfig {
@Bean
public static ListenerWrapper bpp() { // BPPs have to be static
return new ListenerWrapper();
}
}
public static class ListenerWrapper implements BeanPostProcessor, Ordered {
private final CountDownLatch latch = new CountDownLatch(1);
private Object[] argsReceived;
private boolean ackCalled;
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof Listener) {
ProxyFactory pf = new ProxyFactory(bean);
pf.setProxyTargetClass(true); // unless the listener is on an interface
pf.addAdvice(interceptor());
return pf.getProxy();
}
return bean;
}
private MethodInterceptor interceptor() {
return invocation -> {
if (invocation.getMethod().getName().equals("processIncomingMessage")) {
Object[] args = invocation.getArguments();
this.argsReceived = Arrays.copyOf(args, args.length);
Acknowledgment ack = (Acknowledgment) args[1];
args[1] = (Acknowledgment) () -> {
this.ackCalled = true;
ack.acknowledge();
};
try {
return invocation.proceed();
}
finally {
this.latch.countDown();
}
}
else {
return invocation.proceed();
}
};
}
}
}