Spring Kafka 未启动时启动应用程序未启动
Spring Boot app not starting when Kafka is not up
我有一个 Spring 启动应用程序,其中有一个 Kafka 消费者和生产者。还有一个创建话题的bean。
例如
@KafkaListener(topics = "myTopic")
public void doSomething() {
// do something on receipt of the message
}
@Bean
public NewTopic topic(){
return TopicBuilder.name("myTopic")
.partitions(2)
.
我的 Spring 引导应用程序和 Kafka 在 Kubernetes 中的 Docker 中启动。有时 Spring Boot 应用程序会在 Kafka pod 启动之前启动,因此无法启动,因为消费者无法连接(请参阅堆栈跟踪)。
有没有一种方法可以让我的应用程序以弹性方式启动?例如,消费者应该应对启动时不存在的 Kafka 或应用程序 运行 ?
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:827)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:629)
at org.springframework.kafka.core.Def Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:827)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:629)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:207)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:193)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:167)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:141)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:607)
at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:329)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:340)
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:176)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:340)
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312)
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
... 59 common frames omitted
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88)
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:735)aultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:207)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:193)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:167)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:141)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:607)
at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:329)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:340)
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:176)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:340)
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312)
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
... 59 common frames omitted
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88)
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:735)
您可以在侦听器上设置 autostartup = "false"
并自己启动它(使用 KafkaListenerEndpointRegistry
- 给侦听器一个 id
以便您可以从注册表中获取对其容器的引用).
如果代理不可用,KafkaAdmin
将不会创建主题;您还需要致电 KafkaAdmin.initialize()
:
/**
* Call this method to check/add topics; this might be needed if the broker was not
* available when the application context was initialized, and
* {@link #setFatalIfBrokerNotAvailable(boolean) fatalIfBrokerNotAvailable} is false,
* or {@link #setAutoCreate(boolean) autoCreate} was set to false.
* @return true if successful.
* @see #setFatalIfBrokerNotAvailable(boolean)
* @see #setAutoCreate(boolean)
*/
public final boolean initialize() {
我有一个 Spring 启动应用程序,其中有一个 Kafka 消费者和生产者。还有一个创建话题的bean。
例如
@KafkaListener(topics = "myTopic")
public void doSomething() {
// do something on receipt of the message
}
@Bean
public NewTopic topic(){
return TopicBuilder.name("myTopic")
.partitions(2)
.
我的 Spring 引导应用程序和 Kafka 在 Kubernetes 中的 Docker 中启动。有时 Spring Boot 应用程序会在 Kafka pod 启动之前启动,因此无法启动,因为消费者无法连接(请参阅堆栈跟踪)。
有没有一种方法可以让我的应用程序以弹性方式启动?例如,消费者应该应对启动时不存在的 Kafka 或应用程序 运行 ?
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:827)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:629)
at org.springframework.kafka.core.Def Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:827)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:629)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:207)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:193)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:167)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:141)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:607)
at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:329)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:340)
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:176)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:340)
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312)
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
... 59 common frames omitted
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88)
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:735)aultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:207)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:193)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:167)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:141)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:607)
at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:329)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:340)
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:176)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:340)
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312)
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
... 59 common frames omitted
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88)
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:735)
您可以在侦听器上设置 autostartup = "false"
并自己启动它(使用 KafkaListenerEndpointRegistry
- 给侦听器一个 id
以便您可以从注册表中获取对其容器的引用).
如果代理不可用,KafkaAdmin
将不会创建主题;您还需要致电 KafkaAdmin.initialize()
:
/**
* Call this method to check/add topics; this might be needed if the broker was not
* available when the application context was initialized, and
* {@link #setFatalIfBrokerNotAvailable(boolean) fatalIfBrokerNotAvailable} is false,
* or {@link #setAutoCreate(boolean) autoCreate} was set to false.
* @return true if successful.
* @see #setFatalIfBrokerNotAvailable(boolean)
* @see #setAutoCreate(boolean)
*/
public final boolean initialize() {