NPE 使用嵌入式 Kafka 测试 Kafka Producer

NPE Testing Kafka Producer Using Embedded Kafka

我写了一个基本的 spring 引导服务,它通过 rest API 消耗一些数据并将其发布到 rabbitmq 和 kafka。

为了测试处理 kafka 生产的服务 class,我遵循了这个指南:https://www.baeldung.com/spring-boot-kafka-testing

孤立地,测试 (KafkaMessagingServiceImplTest) 在 intellij idea 和命令行上通过 mvn 完美运行。 运行 idea 中的所有项目测试工作正常。但是,当我 运行 在命令行上通过 maven 测试所有项目时,当尝试对有效负载字符串进行断言时,此测试失败并出现 NPE。

我已经将根本问题的位置缩小到另一个测试 class (AppPropertiesTest),它只测试我的 AppProperties 组件(这是我用来从 application.properties 中提取配置的组件以一种整洁的方式)。当且仅当该测试 class 中的测试与项目根目录中使用 'mvn clean install' 的失败测试一起 运行 时,NPE 才会出现。注释掉此 class 中的测试或使用 @DirtiesContext 对其进行注释可以解决问题。显然,此测试 class 加载到 spring 上下文的某些内容会导致另一个测试中 events/countdownlatch 的 timing/order 出现问题。当然,我不想使用@DirtiesContext,因为随着项目复杂性的增加,它会导致构建速度变慢。它也没有解释问题..我无法处理:)

AppPropertiesTest 使用构造函数注入来注入 AppProperties 组件。它还扩展了一个摘要 class 'GenericServiceTest' ,它被注释为:

@SpringBootTest
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL) 

并且不包含任何其他内容。正如您可能知道的那样,SpringBootTest 注释构建了一个测试 spring 上下文并在样板中进行了连接,以允许对 spring 应用程序的依赖项注入等进行有效测试,而 TestConstructor 注释允许在我的一些测试中进行构造函数注入。 FWIW,我已经尝试删除 TestConstructor 注释并在 AppProperties class 中使用普通的旧自动装配来查看它是否有所作为,但它没有。

失败的测试 class 也扩展了 GenericServiceTest,因为它需要 spring 上下文来注入一些依赖项,例如正在测试的消费者和消息服务以及其中的 AppProperties 实例等。

所以我知道问题出在哪里,但我不知道问题是什么。即使在 NPE 测试失败时,根据 Baeldung 指南,我也可以在日志中看到消费者在失败之前已成功使用消息:

TestKafkaConsumer  : received payload='ConsumerRecord(topic = test-kafka-topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1618997289238, serialized key size = -1, serialized value size = 43, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = This is a test message to be sent to Kafka.)'

然而,当我们回到断言时,payLoad 为空。我在失败的测试中尝试了各种方法,例如 Thread.sleep() 以给它更多时间,并且我增加了 await() 超时但没有快乐。

我觉得奇怪的是 IDEA 和孤立的测试都很好。现在它开始让我有点抓狂,我无法调试它,因为问题没有出现在我的 IDE.

如果有人有任何想法,将不胜感激!

谢谢。

编辑:有人非常合理地建议我添加一些代码,所以这里是:)

失败测试(在 assertTrue(payload.contains(testMessage)) 处失败,因为 payLoad 为空)。自动装配的 kafkaMessagingService 仅注入了 AppProperties 和 KakfaTemplate 的依赖项并调用 kafkaTemplate.send():


@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class KafkaMessagingServiceImplTest extends GenericServiceTest {

    @Autowired
    @Qualifier("kafkaMessagingServiceImpl")
    private IMessagingService messagingService;
    @Autowired
    private TestKafkaConsumer kafkaConsumer;
    @Value("${app.topicName}")
    private String testTopic;

    @Test
    public void testSendAndConsumeKafkaMessage() throws InterruptedException {
        String testMessage = "This is a test message to be sent to Kafka.";
        messagingService.sendMessage(testMessage);
        kafkaConsumer.getLatch().await(2000, TimeUnit.MILLISECONDS);
        String payload = kafkaConsumer.getPayload();
        assertTrue(payload.contains(testMessage));
    }

TestConsumer(上面测试中用来消费的)

@Component
public class TestKafkaConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(TestKafkaConsumer.class);

    private CountDownLatch latch = new CountDownLatch(1);
    private String payload = null;

    @KafkaListener(topics = "${app.topicName}")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {
        LOGGER.info("received payload='{}'", consumerRecord.toString());
        setPayload(consumerRecord.toString());

        latch.countDown();
    }

    public CountDownLatch getLatch() {
        return latch;
    }

    public String getPayload() {
        return payload;
    }

    public void setPayload(String payload) {
        this.payload = payload;
    }

项目依赖项:

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.2.2.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.5.8.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.mockito/mockito-all -->
        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-all</artifactId>
            <version>1.10.19</version>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <version>2.5.6.RELEASE</version>
            <scope>test</scope>
        </dependency>

    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

AppPropertiesTest class(其上下文似乎导致了问题)

class AppPropertiesTest extends GenericServiceTest {

    private final AppProperties appProperties;

    public AppPropertiesTest(AppProperties appProperties) {
        this.appProperties = appProperties;
    }

    @Test
    public void testAppPropertiesGetQueueName() {
        String expected = "test-queue";
        String result = appProperties.getRabbitMQQueueName();
        assertEquals(expected, result);
    }

    @Test
    public void testAppPropertiesGetDurableQueue() {
        boolean isDurableQueue = appProperties.isDurableQueue();
        assertTrue(isDurableQueue);
    }
}

AppPropertiesTest class 正在测试的 AppProperties class:

@Component
@ConfigurationProperties("app")
public class AppProperties {

    // a whole bunch of properties by name that are prefixed by app. in the application.properties file. Nothing else
}

通用服务测试 class 两个测试都扩展了。

@SpringBootTest
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
public abstract class GenericServiceTest {

}

失败(你可以在上面的行中看到有效载荷已被接收并打印出来)。

2021-04-21 14:15:07.113  INFO 493384 --- [ntainer#0-0-C-1] service.TestKafkaConsumer  : received payload='ConsumerRecord(topic = test-kafka-topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1619010907076, serialized key size = -1, serialized value size = 43, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = This is a test message to be sent to Kafka.)'
[ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 3.791 s <<< FAILURE! - in 
service.KafkaMessagingServiceImplTest
[ERROR] testSendAndConsumeKafkaMessage  Time elapsed: 2.044 s  <<< ERROR!
java.lang.NullPointerException
    at service.KafkaMessagingServiceImplTest.testSendAndConsumeKafkaMessage(KafkaMessagingServiceImplTest.java:42)

问题是 TestListener 是一个 @Component 所以它被添加了两次 - 记录将转到另一个实例。

我添加了更多调试以验证 getter 是在不同的实例上调用的。

@Component
public class TestKafkaConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(TestKafkaConsumer.class);

    private final CountDownLatch latch = new CountDownLatch(1);
    private String payload = null;


    @KafkaListener(id = "myListener", topics = "${app.kafkaTopicName}")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {
        LOGGER.info("received payload='{}'", consumerRecord.toString());
        setPayload(consumerRecord.toString());

        if (payload != null) {
            LOGGER.info(this + ": payload is not null still");
        }

        latch.countDown();

        if (payload != null) {
            LOGGER.info(this + ": payload is not null after latch countdown");
        }
    }

    public CountDownLatch getLatch() {
        return latch;
    }

    public String getPayload() {
        LOGGER.info(this + ": getting Payload");
        return payload;
    }

    public void setPayload(String payload) {
        this.payload = payload;
    }
}

如果您不想使用 @DirtiesContext,您至少可以在测试完成后停止侦听器容器:

@SpringBootTest
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
public abstract class GenericDataServiceTest {

    @AfterAll
    static void stopContainers(@Autowired KafkaListenerEndpointRegistry registry) {
        registry.stop();
    }

}
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------