如何使用 Spring Boot 测试嵌入式 Kafka

How to test EmbeddedKafka with SpringBoot

我在将自定义 Producer 更改为 KafkaTemplate 后测试 Kafka Producer 时遇到问题。

为了测试原因我写了下一个class:

public class KafkaTestingTools {

    static private Map<String, Consumer<Long, GenericData.Record>> consumers = new HashMap<>();

    static public void sendMessage (String topic, String key, Object message, Schema schema) throws InterruptedException{
        Properties properties = new Properties();
        properties.put("schema.registry.url", "http://localhost:8081");
        properties.put("bootstrap.servers", "http://localhost:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("linger.ms", 1);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "com.logistics.mock.CustomKafkaAvroSerializer");
        KafkaProducer<String, Object> producer = new KafkaProducer<>(properties);

        CustomKafkaAvroDeserializer.setTopicScheme(topic, schema);

        ProducerRecord<String, Object> record = new ProducerRecord<>(topic, key, message);
        producer.send(record);
        producer.close();
    }

    static public void registerConsumerContainer(EmbeddedKafkaBroker embeddedKafka, String topic, Schema schema) throws InterruptedException{
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup" + UUID.randomUUID().toString(), "true", embeddedKafka);
        consumerProps.put("schema.registry.url", "http://localhost:8081");
        consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer", "com.logistics.mock.CustomKafkaAvroDeserializer");
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        ConsumerFactory<Long, GenericData.Record> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
        Consumer<Long, GenericData.Record> consumer = cf.createConsumer();
        consumers.put(topic, consumer);

        embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic);

        CustomKafkaAvroDeserializer.setTopicScheme(topic, schema);
    }

    static public Object getSingleRecordFromRegisteredContainer(EmbeddedKafkaBroker embeddedKafka, String topic){
        return SpecificData.get().deepCopy(
            CustomKafkaAvroDeserializer.getTopicScheme(topic),
            KafkaTestUtils.getSingleRecord(consumers.get(topic), topic).value()
        );
    }
    
}

生产者示例:

@Service
@CommonsLog
public class PointProducer {

    private final KafkaTemplate<String, ExportMessage> kafkaTemplate;
    private final String topic;

    @Autowired
    public PointProducer(@Value("${kafka.producer.points}") String topic,
            KafkaTemplate<String, ExportMessage> kafkaTemplate) {
        this.topic = topic;
        this.kafkaTemplate = kafkaTemplate;
    }

    public void produce(Point point) {
        var message = new ExportMessage();
        message.setId(point.getId());
        log.warn("produce point: " + message.toString());
        kafkaTemplate.send(topic, point.getId().toString(), message);
        kafkaTemplate.flush();
    }

kafka 配置

spring:
  kafka:
    bootstrap-servers: ${spring.embedded.kafka.brokers}
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      point-deserializer: com.logistics.mock.CustomKafkaAvroDeserializer
      auto-offset-reset: latest
      group-id: credit_file_test
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: com.logistics.mock.CustomKafkaAvroSerializer
    schema-registry-url: http://localhost:8081

kafka.consumer.points: points_export
kafka.producer.files: common.file
kafka.producer.orders: common.order
kafka.producer.points: common.point

测试看起来像:

@SpringBootTest
@TestMethodOrder(OrderAnnotation.class)
@EmbeddedKafka(partitions = 1, topics = { "topic1", "topic2" }, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class ApplicationLogisticOrderTest {
@Test
    @Order(1)
    @WithMockUser(roles = "ADMIN")
    void checkSendToKafka() throws Exception {
        KafkaTestingTools.registerConsumerContainer(this.embeddedKafka, TOPIC1, Message.SCHEMA$);
        Thread.sleep(3000);
        prepareCustomizedLogisticOrder(t -> {
        });
        var mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).build();
        mockMvc.perform(MockMvcRequestBuilders.put("/orders/7000000/sendToKafka"));

}

并且在线执行我发现:

Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:478)
    at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:468)
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108)
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129)
    at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.<init>(AbstractKafkaSchemaSerDeConfig.java:177)
    at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:50)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:376)

我试着把它放在 application.yml 中,在 KafkaTestingTools 属性中,但没有任何改变,看起来 Spring 在另一个地方寻找这个 属性。

也许有人遇到过这种情况并知道解决办法?

提前致谢。

问题出在这里:

spring: 卡夫卡: 架构注册表-url: http://localhost:8081

属性 没有 Spring Boot 管理的 属性。 更重要的是这个 schema-registry-url 不适合那个 schema.registry.url.

你要考虑改成这样:

spring:
  kafka:
    producer:
      properties:
        "schema.registry.url": http://localhost:8081

有关详细信息,请参阅文档:https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#features.messaging.kafka.additional-properties