使用 EmbeddedKafka 的集成测试:ContainerTestUtil.waitForAssignment 抛出 Expected 1 but got 0 partitions
Integration test using EmbeddedKafka: ContainerTestUtil.waitForAssignment throws Expected 1 but got 0 partitions
我已经使用 spring 引导和 spring-kafka 库为我的 kafka 消费者编写了一个集成测试。本次测试使用EmbeddedKafka。使用具有一个分区的主题。我为此使用了 KafkaMessageListener 容器。但是我在这一行中收到错误
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic())
我得到的错误是:
java.lang.IllegalStateException: Expected 1 but got 0 partitions.
我参考的代码是:https://blog.mimacom.com/testing-apache-kafka-with-spring-boot-junit5/
@EmbeddedKafka (partitions 1, ports = 9092)
@SpringBoot Test (properties="spring.kafka.bootstrap-servers=${spring.embedded.kakfa.brokers}")
@RunWith(SpringRunner.class)
@DirtiesContext
@Profile("test")
@TestPropertySource({"classpath:application.yaml"}}
public class Onboarding ConsumerListenerTest {
BlockingQueue<ConsumerRecord<String, String> records;
KafkaMessageListenerContainer<String, String> container;
@Autowired
protected EmbeddedkafkaBroker embeddedKafkaBroker;
public ConsumerFactory<String, String> consumerFactory;
@Value("${spring.kafka.client_topic}")
private String topicName;
@Value("${spring.kafka.group_id})
private String groupId;
@Before
public void setUp(){
consumerFactory = getKafkaConsumer(embeddedKafkaBroker, groupId, topicName);
ContainerProperties containerProperties = new ContainerProperties(topicName);
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
consumerRecords = new LinkedBlockingQueue<>();
container.setupMessageListener(new MessageListener<String, String>(){
@Override
public void onMessage(ConsumerRecord<String, String> record){
records.add(record);
}
});
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
}
}
getKafkaConsumer() 函数是:
public ConsumerFactory<String, String>
getKafkaConsumer(EmbeddedKafkaBroker
embeddedKafkaBroker,
String group,
String topic){
Map<string, Object> consumerProps =
KafkaTestUtils.consumerProps(group, "false",
embeddedkafkaNroker);
consumerProps.put(ConsumerConfig.BOOSTRAP_SERVER_CONFIG,
embeddedKafkaBroker.getBrokerAsString());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_COFIG, KafkaAvroDeserializer.class);
consumerProps.put("schema.registry.url", "bogus");
consumerProps.put("specific.avro.reader", true);
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProps);
return consumerFactory;
}
如果没有更多信息,很难说出发生了什么。
另一种方法是手动分配分区,而不是等待组管理来分配它们:
ContainerProperties containerProperties =
new ContainerProperties(new TopicPartitionOffset(topicName, 0),
new TopicPartitionOffset(topicName, 1));
我已经使用 spring 引导和 spring-kafka 库为我的 kafka 消费者编写了一个集成测试。本次测试使用EmbeddedKafka。使用具有一个分区的主题。我为此使用了 KafkaMessageListener 容器。但是我在这一行中收到错误
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic())
我得到的错误是:
java.lang.IllegalStateException: Expected 1 but got 0 partitions.
我参考的代码是:https://blog.mimacom.com/testing-apache-kafka-with-spring-boot-junit5/
@EmbeddedKafka (partitions 1, ports = 9092)
@SpringBoot Test (properties="spring.kafka.bootstrap-servers=${spring.embedded.kakfa.brokers}")
@RunWith(SpringRunner.class)
@DirtiesContext
@Profile("test")
@TestPropertySource({"classpath:application.yaml"}}
public class Onboarding ConsumerListenerTest {
BlockingQueue<ConsumerRecord<String, String> records;
KafkaMessageListenerContainer<String, String> container;
@Autowired
protected EmbeddedkafkaBroker embeddedKafkaBroker;
public ConsumerFactory<String, String> consumerFactory;
@Value("${spring.kafka.client_topic}")
private String topicName;
@Value("${spring.kafka.group_id})
private String groupId;
@Before
public void setUp(){
consumerFactory = getKafkaConsumer(embeddedKafkaBroker, groupId, topicName);
ContainerProperties containerProperties = new ContainerProperties(topicName);
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
consumerRecords = new LinkedBlockingQueue<>();
container.setupMessageListener(new MessageListener<String, String>(){
@Override
public void onMessage(ConsumerRecord<String, String> record){
records.add(record);
}
});
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
}
}
getKafkaConsumer() 函数是:
public ConsumerFactory<String, String>
getKafkaConsumer(EmbeddedKafkaBroker
embeddedKafkaBroker,
String group,
String topic){
Map<string, Object> consumerProps =
KafkaTestUtils.consumerProps(group, "false",
embeddedkafkaNroker);
consumerProps.put(ConsumerConfig.BOOSTRAP_SERVER_CONFIG,
embeddedKafkaBroker.getBrokerAsString());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_COFIG, KafkaAvroDeserializer.class);
consumerProps.put("schema.registry.url", "bogus");
consumerProps.put("specific.avro.reader", true);
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProps);
return consumerFactory;
}
如果没有更多信息,很难说出发生了什么。
另一种方法是手动分配分区,而不是等待组管理来分配它们:
ContainerProperties containerProperties =
new ContainerProperties(new TopicPartitionOffset(topicName, 0),
new TopicPartitionOffset(topicName, 1));