如何为 Spring Kafka Listener 创建集成测试
How to Create Integration Test for Spring Kafka Listener
我有一个微服务向另一个微服务发送应该使用的消息。
因此,kafka 配置有效,一切正常,但我需要为此代码创建一个集成测试,但我不知道该怎么做。
我的 KafkaConsumer.Class 已添加组件注释:
private static final Logger logger = LoggerFactory.getLogger(KafkaReactionConsumerMessageComponent.class);
private final ReactionsService reactionsService;
public KafkaReactionConsumerMessageComponent(ReactionsService reactionsService) {
this.reactionsService = reactionsService;
}
@KafkaListener(topics = "reaction-topic", clientIdPrefix = "string", groupId = "magpie-trending")
public void consumingReactionMessages(ConsumerRecord<String, String> cr,
@Payload String payload){
logger.info("[JSON] received Payload: {}", payload);
try {
ObjectMapper mapper = new ObjectMapper();
ReactionMessage message = mapper.readValue(payload, ReactionMessage.class);
if(StringUtils.equals("unloved", message.getReactionType())) {
reactionsService.deleteReactionsByUserIdAndPostId(message.getPost().getPostId(), message.getUser().getUserId());
logger.info("Deleted reactions from database with postId: {} and userId: {}", message.getPost().getPostId(), message.getUser().getUserId());
} else {
List<Reaction> reactions = creatReactions(message).stream()
.map(reactionsService::insertReaction).collect(Collectors.toList());
logger.info("Added reactions to database: {}", reactions);
}
} catch (Exception e){
logger.error("Cannot Deserialize payload to ReactionMessage");
}
}
我的集成测试是
private static final String TOPIC = "reaction-topic";
private final Logger logger = LoggerFactory.getLogger(KafkaDeletePostConsumerMessageComponent.class);
private final KafkaReactionConsumerMessageComponent kafkaReactionConsumerMessageComponent;
private final EmbeddedKafkaBroker embeddedKafkaBroker;
private Consumer<String, String> consumer;
@SuppressWarnings("SpringJavaAutowiringInspection")
@Autowired
public KafkaReactionMessageConsumerTest(KafkaReactionConsumerMessageComponent kafkaReactionConsumerMessageComponent,
EmbeddedKafkaBroker embeddedKafkaBroker) {
this.kafkaReactionConsumerMessageComponent = kafkaReactionConsumerMessageComponent;
this.embeddedKafkaBroker = embeddedKafkaBroker;
}
@BeforeEach
public void setUp() {
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "true", embeddedKafkaBroker));
consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer()).createConsumer();
consumer.subscribe(Collections.singleton(TOPIC));
consumer.poll(Duration.ZERO);
}
@AfterEach
public void tearDown() {
consumer.close();
}
@Test
public void shoudlConsumeAndInsertInDatabaseReactionDomain() {
ReactionMessage reactionMessage = new ReactionMessage(new PostMessage("1", Set.of("a", "b", "c")),
new UserMessage("2"), LocalDateTime.now().toString(), "loved");
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();
producer.send(new ProducerRecord<>(TOPIC, "1", reactionMessage.toString()));
producer.flush();
assertEquals(3, mongoTemplate.getCollection("reactions").countDocuments());
}
抽象类:
@ExtendWith(SpringExtension.class)
@SpringBootTest
@AutoConfigureMockMvc
@EmbeddedKafka(brokerProperties={
"log.dir=out/embedded-kafka"
})
public abstract class AbstractMongoEmbeddedTest {
@Autowired
private static MongodExecutable mongodExecutable;
@Autowired
protected MongoTemplate mongoTemplate;
@BeforeEach
private void dropPostCollection(){
mongoTemplate.dropCollection(Reaction.class);
}
由于您使用的是嵌入式 Kafka 代理,因此您可以简单地从集成测试中 produce/consume 所需的主题。
消费
可以通过简单的 jUnit 规则进行消费。可以找到用于此目的的规则 here。放心使用吧。
您可以像这样使用它来声明已消费的消息:
assertThat(kafkaConsumeMessagesRule.pollMessage()).isEqualTo("your-expected-message");
制作中
要生成消息,您只需在集成测试中连接 org.springframework.kafka.core.KafkaTemplate
并将消息发送到给定主题。
我有一个微服务向另一个微服务发送应该使用的消息。
因此,kafka 配置有效,一切正常,但我需要为此代码创建一个集成测试,但我不知道该怎么做。
我的 KafkaConsumer.Class 已添加组件注释:
private static final Logger logger = LoggerFactory.getLogger(KafkaReactionConsumerMessageComponent.class);
private final ReactionsService reactionsService;
public KafkaReactionConsumerMessageComponent(ReactionsService reactionsService) {
this.reactionsService = reactionsService;
}
@KafkaListener(topics = "reaction-topic", clientIdPrefix = "string", groupId = "magpie-trending")
public void consumingReactionMessages(ConsumerRecord<String, String> cr,
@Payload String payload){
logger.info("[JSON] received Payload: {}", payload);
try {
ObjectMapper mapper = new ObjectMapper();
ReactionMessage message = mapper.readValue(payload, ReactionMessage.class);
if(StringUtils.equals("unloved", message.getReactionType())) {
reactionsService.deleteReactionsByUserIdAndPostId(message.getPost().getPostId(), message.getUser().getUserId());
logger.info("Deleted reactions from database with postId: {} and userId: {}", message.getPost().getPostId(), message.getUser().getUserId());
} else {
List<Reaction> reactions = creatReactions(message).stream()
.map(reactionsService::insertReaction).collect(Collectors.toList());
logger.info("Added reactions to database: {}", reactions);
}
} catch (Exception e){
logger.error("Cannot Deserialize payload to ReactionMessage");
}
}
我的集成测试是
private static final String TOPIC = "reaction-topic";
private final Logger logger = LoggerFactory.getLogger(KafkaDeletePostConsumerMessageComponent.class);
private final KafkaReactionConsumerMessageComponent kafkaReactionConsumerMessageComponent;
private final EmbeddedKafkaBroker embeddedKafkaBroker;
private Consumer<String, String> consumer;
@SuppressWarnings("SpringJavaAutowiringInspection")
@Autowired
public KafkaReactionMessageConsumerTest(KafkaReactionConsumerMessageComponent kafkaReactionConsumerMessageComponent,
EmbeddedKafkaBroker embeddedKafkaBroker) {
this.kafkaReactionConsumerMessageComponent = kafkaReactionConsumerMessageComponent;
this.embeddedKafkaBroker = embeddedKafkaBroker;
}
@BeforeEach
public void setUp() {
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "true", embeddedKafkaBroker));
consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer()).createConsumer();
consumer.subscribe(Collections.singleton(TOPIC));
consumer.poll(Duration.ZERO);
}
@AfterEach
public void tearDown() {
consumer.close();
}
@Test
public void shoudlConsumeAndInsertInDatabaseReactionDomain() {
ReactionMessage reactionMessage = new ReactionMessage(new PostMessage("1", Set.of("a", "b", "c")),
new UserMessage("2"), LocalDateTime.now().toString(), "loved");
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();
producer.send(new ProducerRecord<>(TOPIC, "1", reactionMessage.toString()));
producer.flush();
assertEquals(3, mongoTemplate.getCollection("reactions").countDocuments());
}
抽象类:
@ExtendWith(SpringExtension.class)
@SpringBootTest
@AutoConfigureMockMvc
@EmbeddedKafka(brokerProperties={
"log.dir=out/embedded-kafka"
})
public abstract class AbstractMongoEmbeddedTest {
@Autowired
private static MongodExecutable mongodExecutable;
@Autowired
protected MongoTemplate mongoTemplate;
@BeforeEach
private void dropPostCollection(){
mongoTemplate.dropCollection(Reaction.class);
}
由于您使用的是嵌入式 Kafka 代理,因此您可以简单地从集成测试中 produce/consume 所需的主题。
消费
可以通过简单的 jUnit 规则进行消费。可以找到用于此目的的规则 here。放心使用吧。
您可以像这样使用它来声明已消费的消息:
assertThat(kafkaConsumeMessagesRule.pollMessage()).isEqualTo("your-expected-message");
制作中
要生成消息,您只需在集成测试中连接 org.springframework.kafka.core.KafkaTemplate
并将消息发送到给定主题。