如何使用 Spring Boot 测试嵌入式 Kafka
How to test EmbeddedKafka with SpringBoot
我在将自定义 Producer 更改为 KafkaTemplate 后测试 Kafka Producer 时遇到问题。
为了测试原因我写了下一个class:
public class KafkaTestingTools {
static private Map<String, Consumer<Long, GenericData.Record>> consumers = new HashMap<>();
static public void sendMessage (String topic, String key, Object message, Schema schema) throws InterruptedException{
Properties properties = new Properties();
properties.put("schema.registry.url", "http://localhost:8081");
properties.put("bootstrap.servers", "http://localhost:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("linger.ms", 1);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "com.logistics.mock.CustomKafkaAvroSerializer");
KafkaProducer<String, Object> producer = new KafkaProducer<>(properties);
CustomKafkaAvroDeserializer.setTopicScheme(topic, schema);
ProducerRecord<String, Object> record = new ProducerRecord<>(topic, key, message);
producer.send(record);
producer.close();
}
static public void registerConsumerContainer(EmbeddedKafkaBroker embeddedKafka, String topic, Schema schema) throws InterruptedException{
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup" + UUID.randomUUID().toString(), "true", embeddedKafka);
consumerProps.put("schema.registry.url", "http://localhost:8081");
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "com.logistics.mock.CustomKafkaAvroDeserializer");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
ConsumerFactory<Long, GenericData.Record> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Long, GenericData.Record> consumer = cf.createConsumer();
consumers.put(topic, consumer);
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic);
CustomKafkaAvroDeserializer.setTopicScheme(topic, schema);
}
static public Object getSingleRecordFromRegisteredContainer(EmbeddedKafkaBroker embeddedKafka, String topic){
return SpecificData.get().deepCopy(
CustomKafkaAvroDeserializer.getTopicScheme(topic),
KafkaTestUtils.getSingleRecord(consumers.get(topic), topic).value()
);
}
}
生产者示例:
@Service
@CommonsLog
public class PointProducer {
private final KafkaTemplate<String, ExportMessage> kafkaTemplate;
private final String topic;
@Autowired
public PointProducer(@Value("${kafka.producer.points}") String topic,
KafkaTemplate<String, ExportMessage> kafkaTemplate) {
this.topic = topic;
this.kafkaTemplate = kafkaTemplate;
}
public void produce(Point point) {
var message = new ExportMessage();
message.setId(point.getId());
log.warn("produce point: " + message.toString());
kafkaTemplate.send(topic, point.getId().toString(), message);
kafkaTemplate.flush();
}
kafka 配置
spring:
kafka:
bootstrap-servers: ${spring.embedded.kafka.brokers}
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
point-deserializer: com.logistics.mock.CustomKafkaAvroDeserializer
auto-offset-reset: latest
group-id: credit_file_test
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.logistics.mock.CustomKafkaAvroSerializer
schema-registry-url: http://localhost:8081
kafka.consumer.points: points_export
kafka.producer.files: common.file
kafka.producer.orders: common.order
kafka.producer.points: common.point
测试看起来像:
@SpringBootTest
@TestMethodOrder(OrderAnnotation.class)
@EmbeddedKafka(partitions = 1, topics = { "topic1", "topic2" }, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class ApplicationLogisticOrderTest {
@Test
@Order(1)
@WithMockUser(roles = "ADMIN")
void checkSendToKafka() throws Exception {
KafkaTestingTools.registerConsumerContainer(this.embeddedKafka, TOPIC1, Message.SCHEMA$);
Thread.sleep(3000);
prepareCustomizedLogisticOrder(t -> {
});
var mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).build();
mockMvc.perform(MockMvcRequestBuilders.put("/orders/7000000/sendToKafka"));
}
并且在线执行我发现:
Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:478)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:468)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129)
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.<init>(AbstractKafkaSchemaSerDeConfig.java:177)
at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32)
at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:50)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:376)
我试着把它放在 application.yml 中,在 KafkaTestingTools 属性中,但没有任何改变,看起来 Spring 在另一个地方寻找这个 属性。
也许有人遇到过这种情况并知道解决办法?
提前致谢。
问题出在这里:
spring:
卡夫卡:
架构注册表-url: http://localhost:8081
属性 没有 Spring Boot 管理的 属性。
更重要的是这个 schema-registry-url
不适合那个 schema.registry.url
.
你要考虑改成这样:
spring:
kafka:
producer:
properties:
"schema.registry.url": http://localhost:8081
我在将自定义 Producer 更改为 KafkaTemplate 后测试 Kafka Producer 时遇到问题。
为了测试原因我写了下一个class:
public class KafkaTestingTools {
static private Map<String, Consumer<Long, GenericData.Record>> consumers = new HashMap<>();
static public void sendMessage (String topic, String key, Object message, Schema schema) throws InterruptedException{
Properties properties = new Properties();
properties.put("schema.registry.url", "http://localhost:8081");
properties.put("bootstrap.servers", "http://localhost:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("linger.ms", 1);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "com.logistics.mock.CustomKafkaAvroSerializer");
KafkaProducer<String, Object> producer = new KafkaProducer<>(properties);
CustomKafkaAvroDeserializer.setTopicScheme(topic, schema);
ProducerRecord<String, Object> record = new ProducerRecord<>(topic, key, message);
producer.send(record);
producer.close();
}
static public void registerConsumerContainer(EmbeddedKafkaBroker embeddedKafka, String topic, Schema schema) throws InterruptedException{
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup" + UUID.randomUUID().toString(), "true", embeddedKafka);
consumerProps.put("schema.registry.url", "http://localhost:8081");
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "com.logistics.mock.CustomKafkaAvroDeserializer");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
ConsumerFactory<Long, GenericData.Record> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Long, GenericData.Record> consumer = cf.createConsumer();
consumers.put(topic, consumer);
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic);
CustomKafkaAvroDeserializer.setTopicScheme(topic, schema);
}
static public Object getSingleRecordFromRegisteredContainer(EmbeddedKafkaBroker embeddedKafka, String topic){
return SpecificData.get().deepCopy(
CustomKafkaAvroDeserializer.getTopicScheme(topic),
KafkaTestUtils.getSingleRecord(consumers.get(topic), topic).value()
);
}
}
生产者示例:
@Service
@CommonsLog
public class PointProducer {
private final KafkaTemplate<String, ExportMessage> kafkaTemplate;
private final String topic;
@Autowired
public PointProducer(@Value("${kafka.producer.points}") String topic,
KafkaTemplate<String, ExportMessage> kafkaTemplate) {
this.topic = topic;
this.kafkaTemplate = kafkaTemplate;
}
public void produce(Point point) {
var message = new ExportMessage();
message.setId(point.getId());
log.warn("produce point: " + message.toString());
kafkaTemplate.send(topic, point.getId().toString(), message);
kafkaTemplate.flush();
}
kafka 配置
spring:
kafka:
bootstrap-servers: ${spring.embedded.kafka.brokers}
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
point-deserializer: com.logistics.mock.CustomKafkaAvroDeserializer
auto-offset-reset: latest
group-id: credit_file_test
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.logistics.mock.CustomKafkaAvroSerializer
schema-registry-url: http://localhost:8081
kafka.consumer.points: points_export
kafka.producer.files: common.file
kafka.producer.orders: common.order
kafka.producer.points: common.point
测试看起来像:
@SpringBootTest
@TestMethodOrder(OrderAnnotation.class)
@EmbeddedKafka(partitions = 1, topics = { "topic1", "topic2" }, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class ApplicationLogisticOrderTest {
@Test
@Order(1)
@WithMockUser(roles = "ADMIN")
void checkSendToKafka() throws Exception {
KafkaTestingTools.registerConsumerContainer(this.embeddedKafka, TOPIC1, Message.SCHEMA$);
Thread.sleep(3000);
prepareCustomizedLogisticOrder(t -> {
});
var mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).build();
mockMvc.perform(MockMvcRequestBuilders.put("/orders/7000000/sendToKafka"));
}
并且在线执行我发现:
Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:478)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:468)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129)
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.<init>(AbstractKafkaSchemaSerDeConfig.java:177)
at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32)
at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:50)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:376)
我试着把它放在 application.yml 中,在 KafkaTestingTools 属性中,但没有任何改变,看起来 Spring 在另一个地方寻找这个 属性。
也许有人遇到过这种情况并知道解决办法?
提前致谢。
问题出在这里:
spring: 卡夫卡: 架构注册表-url: http://localhost:8081
属性 没有 Spring Boot 管理的 属性。
更重要的是这个 schema-registry-url
不适合那个 schema.registry.url
.
你要考虑改成这样:
spring:
kafka:
producer:
properties:
"schema.registry.url": http://localhost:8081