Spring kafka 单元测试监听器未订阅主题
Spring kafka unit test listener not subscribing to topic
我有一个示例项目可以探索 spring with kafka (find here)。我有一个订阅主题 my-test-topic-upstream 的侦听器,它只会查找消息和密钥并将其发布到另一个主题 my-test-topic-downstream 。我试过这是本地卡夫卡( docker-compose 文件在那里)并且它有效。
现在我正在尝试使用嵌入式 kafka 服务器为此编写一个测试。在测试中,我有一个嵌入式服务器正在启动( TestContext.java ),它应该在测试之前启动(覆盖的 junit beforeAll )。
private static EmbeddedKafkaBroker kafka() {
EmbeddedKafkaBroker kafkaEmbedded =
new EmbeddedKafkaBroker(
3,
false,
1,
"my-test-topic-upstream", "my-test-topic-downstream");
Map<String, String> brokerProperties = new HashMap<>();
brokerProperties.put("default.replication.factor", "1");
brokerProperties.put("offsets.topic.replication.factor", "1");
brokerProperties.put("group.initial.rebalance.delay.ms", "3000");
kafkaEmbedded.brokerProperties(brokerProperties);
try {
kafkaEmbedded.afterPropertiesSet();
} catch (Exception e) {
throw new RuntimeException(e);
}
return kafkaEmbedded;
}
然后我创建一个生产者 (TickProducer) 并向我希望我的听众能够使用的主题发布一条消息。
public TickProducer(String brokers) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producer = new KafkaProducer<>(props);
}
public RecordMetadata publishTick(String brand)
throws ExecutionException, InterruptedException {
return publish(TOPIC, brand, Instant.now().toString());
}
private RecordMetadata publish(String topic, String key, String value)
throws ExecutionException, InterruptedException {
final RecordMetadata recordMetadata;
recordMetadata = producer.send(new ProducerRecord<>(topic, key, value)).get();
producer.flush();
return recordMetadata;
}
我看到以下日志消息继续记录。
11:32:35.745 [main] WARN o.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=my-test-group] Connection to node -1 could not be established. Broker may not be available.
最终失败
11:36:52.774 [main] ERROR o.s.boot.SpringApplication - Application run failed
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
这里有什么提示吗?
查看 INFO 日志 ConsumerConfig
以查看他尝试连接的位置(将其与 ProducerConfig
进行比较)。我怀疑你没有更新 spring 引导 bootstrap-servers
属性 以指向嵌入式代理。
见
/**
* Set the system property with this name to the list of broker addresses.
* @param brokerListProperty the brokerListProperty to set
* @return this broker.
* @since 2.3
*/
public EmbeddedKafkaBroker brokerListProperty(String brokerListProperty) {
this.brokerListProperty = brokerListProperty;
return this;
}
将其设置为 spring.kafka.bootstrap-servers
,然后将使用它代替 SPRING_EMBEDDED_KAFKA_BROKERS
。
顺便说一句,通常使用 @EmbeddedKafka
注释比自己实例化服务器更容易。
我有一个示例项目可以探索 spring with kafka (find here)。我有一个订阅主题 my-test-topic-upstream 的侦听器,它只会查找消息和密钥并将其发布到另一个主题 my-test-topic-downstream 。我试过这是本地卡夫卡( docker-compose 文件在那里)并且它有效。
现在我正在尝试使用嵌入式 kafka 服务器为此编写一个测试。在测试中,我有一个嵌入式服务器正在启动( TestContext.java ),它应该在测试之前启动(覆盖的 junit beforeAll )。
private static EmbeddedKafkaBroker kafka() {
EmbeddedKafkaBroker kafkaEmbedded =
new EmbeddedKafkaBroker(
3,
false,
1,
"my-test-topic-upstream", "my-test-topic-downstream");
Map<String, String> brokerProperties = new HashMap<>();
brokerProperties.put("default.replication.factor", "1");
brokerProperties.put("offsets.topic.replication.factor", "1");
brokerProperties.put("group.initial.rebalance.delay.ms", "3000");
kafkaEmbedded.brokerProperties(brokerProperties);
try {
kafkaEmbedded.afterPropertiesSet();
} catch (Exception e) {
throw new RuntimeException(e);
}
return kafkaEmbedded;
}
然后我创建一个生产者 (TickProducer) 并向我希望我的听众能够使用的主题发布一条消息。
public TickProducer(String brokers) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producer = new KafkaProducer<>(props);
}
public RecordMetadata publishTick(String brand)
throws ExecutionException, InterruptedException {
return publish(TOPIC, brand, Instant.now().toString());
}
private RecordMetadata publish(String topic, String key, String value)
throws ExecutionException, InterruptedException {
final RecordMetadata recordMetadata;
recordMetadata = producer.send(new ProducerRecord<>(topic, key, value)).get();
producer.flush();
return recordMetadata;
}
我看到以下日志消息继续记录。
11:32:35.745 [main] WARN o.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=my-test-group] Connection to node -1 could not be established. Broker may not be available.
最终失败
11:36:52.774 [main] ERROR o.s.boot.SpringApplication - Application run failed
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
这里有什么提示吗?
查看 INFO 日志 ConsumerConfig
以查看他尝试连接的位置(将其与 ProducerConfig
进行比较)。我怀疑你没有更新 spring 引导 bootstrap-servers
属性 以指向嵌入式代理。
见
/**
* Set the system property with this name to the list of broker addresses.
* @param brokerListProperty the brokerListProperty to set
* @return this broker.
* @since 2.3
*/
public EmbeddedKafkaBroker brokerListProperty(String brokerListProperty) {
this.brokerListProperty = brokerListProperty;
return this;
}
将其设置为 spring.kafka.bootstrap-servers
,然后将使用它代替 SPRING_EMBEDDED_KAFKA_BROKERS
。
顺便说一句,通常使用 @EmbeddedKafka
注释比自己实例化服务器更容易。