测试容器中的 Kafka 侦听器,避开端口 9092

Kafka listener in test container, avoiding port 9092

我编写了一个集成测试来表明发送到 Kafka 的消息将通过侦听器到达。当且仅当我使用 KAFKA_PORT=9092 时它才会通过。该常量是开发人员机器(或 CI 机器)上使用的端口。

最终我想在动态分配的端口上执行此操作(即使用 GenericContainer 而不是 FixedHostPortGenericContainer),但目前我只想使用不同的端口.

如果我在下面的代码中设置 KAFKA_PORT=59092 那么测试失败并且我看到控制台输出如 Connection to node -1 (localhost/127.0.0.1:9092) could not be established 例如:

2020-06-08 12:16:22.374  WARN 1371 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=test-consumer-group] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

我假设我需要做一些额外的配置,这样就不会 尝试使用端口 9092,但这让我望而却步。

下面是一个重新创建的、剥离后的测试,以及相关的 gradle.build

KafaSpikeFixedPort.java

package com.example.kafkaspike;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.FixedHostPortGenericContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
@Testcontainers
public class KafaSpikeFixedPort {

    // Test works only if this port is 9092 (matching the Docker container port)
    final static int KAFKA_PORT = 9092;

    @DynamicPropertySource
    static void kafkaProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.producer.bootstrap-servers",
                     () -> "kubernetes.docker.internal:"+ KAFKA_PORT);
        registry.add("spring.kafka.consumer.group-id",
                     () -> "test-consumer-group");
    }

    @Container
    private GenericContainer kafkaContainer =
            new FixedHostPortGenericContainer("obsidiandynamics/kafka:2.3.0-11")
                    .withFixedExposedPort(KAFKA_PORT, 9092)
                    .withExtraHost("kubernetes.docker.internal", "127.0.0.1")
                    .withEnv("KAFKA_LISTENERS",
                             "INTERNAL://:29092," +
                             "EXTERNAL://:"+KAFKA_PORT)
                    .withEnv("KAFKA_ADVERTISED_LISTENERS",
                             "INTERNAL://kubernetes.docker.internal:29092," +
                             "EXTERNAL://kubernetes.docker.internal:"+KAFKA_PORT)
                    .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
                             "INTERNAL:PLAINTEXT," +
                             "EXTERNAL:PLAINTEXT")
                    .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME",
                             "INTERNAL")
                    .waitingFor(Wait.forLogMessage(
                            ".*INFO\s+\[KafkaServer\s+id=\d+\]" +
                            "\s+started\s+\(kafka.server.KafkaServer\).*",
                            1));

    @Autowired
    KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    private List<String> payloadsReceived = new ArrayList<>();

    @KafkaListener(autoStartup = "false", topics = "topic1")
    public void onMessage(@Payload String payload) {
        payloadsReceived.add(payload);
    }

    @BeforeEach
    public void beforeEach() {
        payloadsReceived.clear();
        for(MessageListenerContainer listenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
            listenerContainer.start();
        }
        sleep(2_000); // Just for the spike. (Eliminates code checking the listener container states.)
    }

    @AfterEach
    public void afterEach() {
        for(MessageListenerContainer listenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
            listenerContainer.stop();
        }
        sleep(2_000); // Just for the spike. (Eliminates code checking the listener container states.)
    }

    @Test
    @Timeout(value = 3, unit = TimeUnit.SECONDS)
    public void test() {
        kafkaTemplate.send("topic1", "Hello World!");
        sleep(1_000); // Just for the spike. (Wait for message in production test, with test timeout.)
        assertAll(
                () -> assertEquals(1, payloadsReceived.size()),
                () -> assertEquals("Hello World!", payloadsReceived.get(0))
                 );
    }

    private void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch(InterruptedException e) {
        }
    }
}

gradle.build

plugins {
    id 'org.springframework.boot' version '2.2.7.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'java'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.kafka:spring-kafka'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
    testImplementation 'org.springframework.kafka:spring-kafka-test'
    // for TestContainers
    testCompile group: 'org.testcontainers', name: 'testcontainers', version: "1.14.1"
    testCompile group: 'org.testcontainers', name: 'junit-jupiter', version: "1.14.1"
}

test {
    useJUnitPlatform()
}

Testcontainers' Kafka module存在的原因:)

它通过延迟 运行 Kafka 的进程来处理端口设置,以便它可以提供实际分配的随机端口作为 ADVERTISED_HOST env 变量。

尝试一下,或查看来源以获取一些灵感:
https://github.com/testcontainers/testcontainers-java/tree/master/modules/kafka