使用 EmbeddedKafka 的集成测试:ContainerTestUtil.waitForAssignment 抛出 Expected 1 but got 0 partitions

Integration test using EmbeddedKafka: ContainerTestUtil.waitForAssignment throws Expected 1 but got 0 partitions

我已经使用 spring 引导和 spring-kafka 库为我的 kafka 消费者编写了一个集成测试。本次测试使用EmbeddedKafka。使用具有一个分区的主题。我为此使用了 KafkaMessageListener 容器。但是我在这一行中收到错误

ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic())

我得到的错误是:

java.lang.IllegalStateException: Expected 1 but got 0 partitions.

我参考的代码是:https://blog.mimacom.com/testing-apache-kafka-with-spring-boot-junit5/

@EmbeddedKafka (partitions 1, ports = 9092) 
@SpringBoot Test (properties="spring.kafka.bootstrap-servers=${spring.embedded.kakfa.brokers}")

@RunWith(SpringRunner.class)

@DirtiesContext

@Profile("test") 
@TestPropertySource({"classpath:application.yaml"}}

public class Onboarding ConsumerListenerTest {

BlockingQueue<ConsumerRecord<String, String> records;

KafkaMessageListenerContainer<String, String> container;

@Autowired

protected EmbeddedkafkaBroker embeddedKafkaBroker;

public ConsumerFactory<String, String> consumerFactory;

@Value("${spring.kafka.client_topic}")

private String topicName;

@Value("${spring.kafka.group_id})
private String groupId;

@Before
public void setUp(){
    
    consumerFactory = getKafkaConsumer(embeddedKafkaBroker, groupId, topicName);
    ContainerProperties containerProperties = new ContainerProperties(topicName);
    container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
    consumerRecords = new LinkedBlockingQueue<>();
    container.setupMessageListener(new MessageListener<String, String>(){
        @Override
        public void onMessage(ConsumerRecord<String, String> record){
            records.add(record);
        }
    });
    
    container.start();
    ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
}
}

getKafkaConsumer() 函数是:

public ConsumerFactory<String, String> 
                             getKafkaConsumer(EmbeddedKafkaBroker 
                               embeddedKafkaBroker,
                               String group,
                               String topic){
        Map<string, Object> consumerProps = 
        KafkaTestUtils.consumerProps(group, "false", 
                                     embeddedkafkaNroker);
       consumerProps.put(ConsumerConfig.BOOSTRAP_SERVER_CONFIG, 
                           embeddedKafkaBroker.getBrokerAsString());
       consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
       consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_COFIG, KafkaAvroDeserializer.class);
       consumerProps.put("schema.registry.url", "bogus");
       consumerProps.put("specific.avro.reader", true);
       ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProps);
       return consumerFactory;
}

如果没有更多信息,很难说出发生了什么。

另一种方法是手动分配分区,而不是等待组管理来分配它们:

ContainerProperties containerProperties = 
    new ContainerProperties(new TopicPartitionOffset(topicName, 0), 
                            new TopicPartitionOffset(topicName, 1));