如何配置 Spring 启动 Kafka 客户端使其不尝试连接
How to configure Spring Boot Kafka client so it does not try to connect
这与 有关,但我仍然认为这是一个 不同的 问题:)
我们需要配置 Spring 启动 Kafka 客户端,这样它就不会尝试连接。
用例是在测试环境中我们没有 Kafka 运行 但我们仍然需要构建完整的 Spring 启动上下文,因此使这个 bean 以配置文件为条件是行不通的.我们不关心 been 是否未连接,但我们需要它存在。
问题是连接尝试失败大约需要 30-40 秒,我们的测试速度明显变慢。
configuration parameters 中的哪一个或它们的哪个组合完全禁止连接尝试,或者至少强制客户端只尝试一次?
多次重试连接的代码是这样的:
@Bean
public KafkaAdmin.NewTopics topics() {
return new KafkaAdmin.NewTopics(
TopicBuilder.name("MyTopic").build()
);
}
它反复产生这个警告:
WARN ... org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
以下代码仅尝试连接一次:
@Bean
public ReactiveKafkaConsumerTemplate<String, MyEvent> myConsumer(KafkaProperties properties) {
return createConsumer(properties, "MyTopic", "MyConsumerGroup");
}
public <E> ReactiveKafkaConsumerTemplate<String, E> createConsumer(KafkaProperties properties, String topic, String consumerGroup) {
final Map<String, Object> map = configureKafkaProperties(properties, consumerGroup);
return new ReactiveKafkaConsumerTemplate<>(
ReceiverOptions.<String, E>create(map)
.subscription(List.of(topic)));
}
生产
WARN 7268 ... org.apache.kafka.clients.NetworkClient : Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
我也试过设置 属性
spring.kafka.admin.fail-fast=true
但这似乎完全没有效果。
Spring 引导自动配置一个 KafkaAdmin
,默认情况下,它将连接到代理以创建任何 NewTopic
bean。您可以将其 autoCreate
属性 设置为 false。
/**
* Set to false to suppress auto creation of topics during context initialization.
* @param autoCreate boolean flag to indicate creating topics or not during context initialization
* @see #initialize()
*/
public void setAutoCreate(boolean autoCreate) {
编辑
要获得对 KafkaAdmin
的引用,只需将其作为参数添加到任何 bean 定义中即可。
例如
@Bean
public KafkaAdmin.NewTopics topics(KafkaAdmin admin) {
admin.setAutoCreate(false);
return new KafkaAdmin.NewTopics(
TopicBuilder.name("MyTopic").build()
);
}
另见 KafkaAdmin.initialize()
。
/**
* Call this method to check/add topics; this might be needed if the broker was not
* available when the application context was initialized, and
* {@link #setFatalIfBrokerNotAvailable(boolean) fatalIfBrokerNotAvailable} is false,
* or {@link #setAutoCreate(boolean) autoCreate} was set to false.
* @return true if successful.
* @see #setFatalIfBrokerNotAvailable(boolean)
* @see #setAutoCreate(boolean)
*/
public final boolean initialize() {
当使用 @KafkaListener
设置 autoStartup = "false"
以防止消费者在上下文初始化时启动。
使用 reactor,只是不要订阅 receive*()
方法返回的 Flux
(这是触发消费者创建的原因)。
这与
我们需要配置 Spring 启动 Kafka 客户端,这样它就不会尝试连接。
用例是在测试环境中我们没有 Kafka 运行 但我们仍然需要构建完整的 Spring 启动上下文,因此使这个 bean 以配置文件为条件是行不通的.我们不关心 been 是否未连接,但我们需要它存在。
问题是连接尝试失败大约需要 30-40 秒,我们的测试速度明显变慢。
configuration parameters 中的哪一个或它们的哪个组合完全禁止连接尝试,或者至少强制客户端只尝试一次?
多次重试连接的代码是这样的:
@Bean
public KafkaAdmin.NewTopics topics() {
return new KafkaAdmin.NewTopics(
TopicBuilder.name("MyTopic").build()
);
}
它反复产生这个警告:
WARN ... org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
以下代码仅尝试连接一次:
@Bean
public ReactiveKafkaConsumerTemplate<String, MyEvent> myConsumer(KafkaProperties properties) {
return createConsumer(properties, "MyTopic", "MyConsumerGroup");
}
public <E> ReactiveKafkaConsumerTemplate<String, E> createConsumer(KafkaProperties properties, String topic, String consumerGroup) {
final Map<String, Object> map = configureKafkaProperties(properties, consumerGroup);
return new ReactiveKafkaConsumerTemplate<>(
ReceiverOptions.<String, E>create(map)
.subscription(List.of(topic)));
}
生产
WARN 7268 ... org.apache.kafka.clients.NetworkClient : Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
我也试过设置 属性
spring.kafka.admin.fail-fast=true
但这似乎完全没有效果。
Spring 引导自动配置一个 KafkaAdmin
,默认情况下,它将连接到代理以创建任何 NewTopic
bean。您可以将其 autoCreate
属性 设置为 false。
/**
* Set to false to suppress auto creation of topics during context initialization.
* @param autoCreate boolean flag to indicate creating topics or not during context initialization
* @see #initialize()
*/
public void setAutoCreate(boolean autoCreate) {
编辑
要获得对 KafkaAdmin
的引用,只需将其作为参数添加到任何 bean 定义中即可。
例如
@Bean
public KafkaAdmin.NewTopics topics(KafkaAdmin admin) {
admin.setAutoCreate(false);
return new KafkaAdmin.NewTopics(
TopicBuilder.name("MyTopic").build()
);
}
另见 KafkaAdmin.initialize()
。
/**
* Call this method to check/add topics; this might be needed if the broker was not
* available when the application context was initialized, and
* {@link #setFatalIfBrokerNotAvailable(boolean) fatalIfBrokerNotAvailable} is false,
* or {@link #setAutoCreate(boolean) autoCreate} was set to false.
* @return true if successful.
* @see #setFatalIfBrokerNotAvailable(boolean)
* @see #setAutoCreate(boolean)
*/
public final boolean initialize() {
当使用 @KafkaListener
设置 autoStartup = "false"
以防止消费者在上下文初始化时启动。
使用 reactor,只是不要订阅 receive*()
方法返回的 Flux
(这是触发消费者创建的原因)。