spring Kafka 与嵌入式 Kafka 的集成测试
spring Kafka integration testing with embedded Kafka
我有 spring 引导应用程序,它有一个消费者从一个集群中的主题消费并生产到不同集群中的另一个主题。
现在我正在尝试使用 spring 嵌入式 Kafka 编写集成测试用例,但遇到问题 KafkaTemplate could not be registered. A bean with that name has already been defined in class path resource
消费者Class
@Service
public class KafkaConsumerService {
@Autowired
private KafkaProducerService kafkaProducerService;
@KafkaListener(topics = "${kafka.producer.topic}")
public void professor(List<Professor> pro) {
pro.forEach(kafkaProducerService::produce);
}
}
制作人Class
@Service
public class KafkaProducerService {
@Value("${kafka.producer.topic}")
private String topic;
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public void produce(Professor pro) {
kafkaTemplate.send(topic,"professor",pro);
}
}
在我的测试用例中,我想覆盖 KafkaTemplate
,这样当我在 Test
中调用 kafkaConsumerService.professor
方法时,它应该将数据生成到嵌入式 Kafka 中,我应该验证它。
测试配置
@TestConfiguration
@EmbeddedKafka(partitions = 1, controlledShutdown = false,
brokerProperties = {"listeners=PLAINTEXT://localhost:3333", "port=3333"})
public class KafkaProducerConfigTest {
@Autowired
KafkaEmbedded kafkaEmbeded;
@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Before
public void setUp() throws Exception {
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer,
kafkaEmbeded.getPartitionsPerTopic());
}
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbeded));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
return kafkaTemplate;
}
}
测试class
@EnableKafka
@SpringBootTest(classes = {KafkaProducerConfigTest.class})
@RunWith(SpringRunner.class)
public class KafkaProducerServiceTest {
@Autowired
private KafkaConsumerService kafkaConsumerService;
@Test
public void testReceive() throws Exception {
kafkaConsumerService.professor(Arrays.asList(new Professor()));
//How to check messages is sent to kafka?
}
}
错误
The bean 'kafkaTemplate', defined in com.kafka.configuration.KafkaProducerConfigTest, could not be registered.
A bean with that name has already been defined in class path resource [com/kafka/configuration/KafkaProducerConfig.class] and overriding is disabled.
Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true
还有谁能帮我验证发送到嵌入式 Kafka 服务器的消息吗?
注意我有一些已弃用的警告
The type KafkaEmbedded is deprecated
The method getPartitionsPerTopic() from the type KafkaEmbedded is deprecated
The method producerProps(KafkaEmbedded) from the type KafkaTestUtils is deprecated
启动 2.1 disables bean overriding by default。
Bean overriding has been disabled by default to prevent a bean being accidentally overridden. If you are relying on overriding, you will need to set spring.main.allow-bean-definition-overriding
to true
.
关于弃用;请参阅 @EmbeddedKafka
的 javadoc。它被替换为 EmbeddedKafkaBroker
。
我有 spring 引导应用程序,它有一个消费者从一个集群中的主题消费并生产到不同集群中的另一个主题。
现在我正在尝试使用 spring 嵌入式 Kafka 编写集成测试用例,但遇到问题 KafkaTemplate could not be registered. A bean with that name has already been defined in class path resource
消费者Class
@Service
public class KafkaConsumerService {
@Autowired
private KafkaProducerService kafkaProducerService;
@KafkaListener(topics = "${kafka.producer.topic}")
public void professor(List<Professor> pro) {
pro.forEach(kafkaProducerService::produce);
}
}
制作人Class
@Service
public class KafkaProducerService {
@Value("${kafka.producer.topic}")
private String topic;
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public void produce(Professor pro) {
kafkaTemplate.send(topic,"professor",pro);
}
}
在我的测试用例中,我想覆盖 KafkaTemplate
,这样当我在 Test
中调用 kafkaConsumerService.professor
方法时,它应该将数据生成到嵌入式 Kafka 中,我应该验证它。
测试配置
@TestConfiguration
@EmbeddedKafka(partitions = 1, controlledShutdown = false,
brokerProperties = {"listeners=PLAINTEXT://localhost:3333", "port=3333"})
public class KafkaProducerConfigTest {
@Autowired
KafkaEmbedded kafkaEmbeded;
@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Before
public void setUp() throws Exception {
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer,
kafkaEmbeded.getPartitionsPerTopic());
}
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbeded));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
return kafkaTemplate;
}
}
测试class
@EnableKafka
@SpringBootTest(classes = {KafkaProducerConfigTest.class})
@RunWith(SpringRunner.class)
public class KafkaProducerServiceTest {
@Autowired
private KafkaConsumerService kafkaConsumerService;
@Test
public void testReceive() throws Exception {
kafkaConsumerService.professor(Arrays.asList(new Professor()));
//How to check messages is sent to kafka?
}
}
错误
The bean 'kafkaTemplate', defined in com.kafka.configuration.KafkaProducerConfigTest, could not be registered.
A bean with that name has already been defined in class path resource [com/kafka/configuration/KafkaProducerConfig.class] and overriding is disabled.
Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true
还有谁能帮我验证发送到嵌入式 Kafka 服务器的消息吗?
注意我有一些已弃用的警告
The type KafkaEmbedded is deprecated
The method getPartitionsPerTopic() from the type KafkaEmbedded is deprecated
The method producerProps(KafkaEmbedded) from the type KafkaTestUtils is deprecated
启动 2.1 disables bean overriding by default。
Bean overriding has been disabled by default to prevent a bean being accidentally overridden. If you are relying on overriding, you will need to set
spring.main.allow-bean-definition-overriding
totrue
.
关于弃用;请参阅 @EmbeddedKafka
的 javadoc。它被替换为 EmbeddedKafkaBroker
。