KafkaTestUtils.getRecords() 或 KafkaTestUtils.getSingleRecord() 有时花费的时间太长
KafkaTestUtils.getRecords() or KafkaTestUtils.getSingleRecord() takes too long some times
我正在测试一个生成 kafka 消息的 Spring-boot 应用程序,我在测试中创建了一个消费者来验证我们是否正确发送消息。当我在测试中使用 KafkaTestUtils.getRecords() 或 KafkaTestUtils.getSingleRecord() 时,接收记录所需的时间在每个 运行 上变化很大。有时需要 1 秒,有时需要 20 秒,这是预期的吗?有什么提高性能的方法吗?
这是测试:
void ProducerTest() {
//given
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//Running docker kafka locally
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DTODeserializer.class);
ConsumerFactory<String, DTO> cf = new DefaultKafkaConsumerFactory<>(props);
Consumer<String, DTO> consumerTest = cf.createConsumer();
consumerTest.subscribe(Collections.singleton("topic"));
//when
//call to api-rest that produces the kafka messate in the "topic"
//then
ConsumerRecords<String, DTO> records = KafkaTestUtils.getRecords(consumerTest);
//Assert
}
这一定是您的 docker 容器或生产者的问题;这对我来说与本地经纪人一致(不到半秒)。
@Test
void ProducerTest() throws Exception {
// given
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
ConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(props);
Consumer<String, String> consumerTest = cf.createConsumer();
consumerTest.subscribe(Collections.singleton("topic"));
// when
props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
ProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(props);
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
template.send("topic", "foo").get(10, TimeUnit.SECONDS);
// then
ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumerTest);
ConsumerRecord<String, String> record = records.iterator().next();
assertThat(record).isNotNull();
assertThat(record.value()).isEqualTo("foo");
pf.reset();
consumerTest.close();
}
我正在测试一个生成 kafka 消息的 Spring-boot 应用程序,我在测试中创建了一个消费者来验证我们是否正确发送消息。当我在测试中使用 KafkaTestUtils.getRecords() 或 KafkaTestUtils.getSingleRecord() 时,接收记录所需的时间在每个 运行 上变化很大。有时需要 1 秒,有时需要 20 秒,这是预期的吗?有什么提高性能的方法吗?
这是测试:
void ProducerTest() {
//given
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//Running docker kafka locally
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DTODeserializer.class);
ConsumerFactory<String, DTO> cf = new DefaultKafkaConsumerFactory<>(props);
Consumer<String, DTO> consumerTest = cf.createConsumer();
consumerTest.subscribe(Collections.singleton("topic"));
//when
//call to api-rest that produces the kafka messate in the "topic"
//then
ConsumerRecords<String, DTO> records = KafkaTestUtils.getRecords(consumerTest);
//Assert
}
这一定是您的 docker 容器或生产者的问题;这对我来说与本地经纪人一致(不到半秒)。
@Test
void ProducerTest() throws Exception {
// given
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
ConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(props);
Consumer<String, String> consumerTest = cf.createConsumer();
consumerTest.subscribe(Collections.singleton("topic"));
// when
props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
ProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(props);
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
template.send("topic", "foo").get(10, TimeUnit.SECONDS);
// then
ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumerTest);
ConsumerRecord<String, String> record = records.iterator().next();
assertThat(record).isNotNull();
assertThat(record.value()).isEqualTo("foo");
pf.reset();
consumerTest.close();
}