如何在 Spring 测试中创建嵌入式 Kafka 时修复 'Invalid URL'
How to fix 'Invalid URL' while creating embedded Kafka in Spring test
我正在尝试为一段将数据发送到 Kafka 主题的代码构建测试,我一直在阅读一个看起来非常简单的示例,但是当我尝试 运行 测试时,它无法抛出此错误:'Invalid url in bootstrap.servers: spring.embedded.kafka.brokers'
URL 'spring.embedded.kafka.brokers' 我从文档中得到它,但我现在找不到 link 来源。这是我到目前为止尝试过的:
@RunWith(MockitoJUnitRunner.class)
@SpringBootTest
@DirtiesContext
@TestPropertySource({"classpath:application-test.properties"})
public class PublishCustomerServiceImplTest {
private String bootstrapServers = "spring.embedded.kafka.brokers";
private static final String TOPIC = "TopicName";
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.internals.DefaultPartitioner");
return props;
}
public ProducerFactory<String, CustomerPublishRequest> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
public KafkaTemplate<String, CustomerPublishRequest> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TOPIC);
@Test
public void publishCustomerTest() throws Exception {
KafkaTemplate<String, CustomerPublishRequest> kafkaTemplate = kafkaTemplate();
ListenableFuture<SendResult<String, CustomerPublishRequest>> future = kafkaTemplate.send(TOPIC, CustomerRequestDummy.getCustomer());
SendResult<String, CustomerPublishRequest> sendResult = future.get();
long offset = sendResult.getRecordMetadata().offset();
}
}
如有任何帮助,我们将不胜感激。
错误信息很清楚这是spring.embedded.kafka.brokers
无效bootstrap-serverURL,从EmbeddedKafkaRule
获取
@RunWith(MockitoJUnitRunner.class)
@SpringBootTest
@DirtiesContext
@TestPropertySource({"classpath:application-test.properties"})
public class PublishCustomerServiceImplTest {
private static final String TOPIC = "TopicName";
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TOPIC);
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getEmbeddedKafka().getBrokersAsString());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.internals.DefaultPartitioner");
return props;
}
public ProducerFactory<String, CustomerPublishRequest> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
public KafkaTemplate<String, CustomerPublishRequest> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Test
public void publishCustomerTest() throws Exception {
KafkaTemplate<String, CustomerPublishRequest> kafkaTemplate = kafkaTemplate();
ListenableFuture<SendResult<String, CustomerPublishRequest>> future = kafkaTemplate.send(TOPIC, CustomerRequestDummy.getCustomer());
SendResult<String, CustomerPublishRequest> sendResult = future.get();
long offset = sendResult.getRecordMetadata().offset();
}
}
遇到了同样的问题,然后通过覆盖以下两个属性修复了它,
@TestPropertySource(properties = {"spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.admin.properties.bootstrap.servers=${spring.embedded.kafka.brokers}"})
我正在尝试为一段将数据发送到 Kafka 主题的代码构建测试,我一直在阅读一个看起来非常简单的示例,但是当我尝试 运行 测试时,它无法抛出此错误:'Invalid url in bootstrap.servers: spring.embedded.kafka.brokers'
URL 'spring.embedded.kafka.brokers' 我从文档中得到它,但我现在找不到 link 来源。这是我到目前为止尝试过的:
@RunWith(MockitoJUnitRunner.class)
@SpringBootTest
@DirtiesContext
@TestPropertySource({"classpath:application-test.properties"})
public class PublishCustomerServiceImplTest {
private String bootstrapServers = "spring.embedded.kafka.brokers";
private static final String TOPIC = "TopicName";
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.internals.DefaultPartitioner");
return props;
}
public ProducerFactory<String, CustomerPublishRequest> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
public KafkaTemplate<String, CustomerPublishRequest> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TOPIC);
@Test
public void publishCustomerTest() throws Exception {
KafkaTemplate<String, CustomerPublishRequest> kafkaTemplate = kafkaTemplate();
ListenableFuture<SendResult<String, CustomerPublishRequest>> future = kafkaTemplate.send(TOPIC, CustomerRequestDummy.getCustomer());
SendResult<String, CustomerPublishRequest> sendResult = future.get();
long offset = sendResult.getRecordMetadata().offset();
}
}
如有任何帮助,我们将不胜感激。
错误信息很清楚这是spring.embedded.kafka.brokers
无效bootstrap-serverURL,从EmbeddedKafkaRule
@RunWith(MockitoJUnitRunner.class)
@SpringBootTest
@DirtiesContext
@TestPropertySource({"classpath:application-test.properties"})
public class PublishCustomerServiceImplTest {
private static final String TOPIC = "TopicName";
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TOPIC);
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getEmbeddedKafka().getBrokersAsString());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.internals.DefaultPartitioner");
return props;
}
public ProducerFactory<String, CustomerPublishRequest> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
public KafkaTemplate<String, CustomerPublishRequest> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Test
public void publishCustomerTest() throws Exception {
KafkaTemplate<String, CustomerPublishRequest> kafkaTemplate = kafkaTemplate();
ListenableFuture<SendResult<String, CustomerPublishRequest>> future = kafkaTemplate.send(TOPIC, CustomerRequestDummy.getCustomer());
SendResult<String, CustomerPublishRequest> sendResult = future.get();
long offset = sendResult.getRecordMetadata().offset();
}
}
遇到了同样的问题,然后通过覆盖以下两个属性修复了它,
@TestPropertySource(properties = {"spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.admin.properties.bootstrap.servers=${spring.embedded.kafka.brokers}"})