Spring Kafka 未启动时启动应用程序未启动

Spring Boot app not starting when Kafka is not up

我有一个 Spring 启动应用程序,其中有一个 Kafka 消费者和生产者。还有一个创建话题的bean。

例如

@KafkaListener(topics = "myTopic")
    public void doSomething() {
      // do something on receipt of the message
    }


@Bean
public NewTopic topic(){
    return TopicBuilder.name("myTopic")
            .partitions(2)
            .

我的 Spring 引导应用程序和 Kafka 在 Kubernetes 中的 Docker 中启动。有时 Spring Boot 应用程序会在 Kafka pod 启动之前启动,因此无法启动,因为消费者无法连接(请参阅堆栈跟踪)。

有没有一种方法可以让我的应用程序以弹性方式启动?例如,消费者应该应对启动时不存在的 Kafka 或应用程序 运行 ?

Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:827)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:629)
    at org.springframework.kafka.core.Def    Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:827)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:629)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:207)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:193)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:167)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:141)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:607)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:329)
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:340)
    at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:176)
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:340)
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312)
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257)
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
    ... 59 common frames omitted
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88)
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:735)aultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:207)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:193)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:167)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:141)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:607)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:329)
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:340)
    at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:176)
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:340)
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312)
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257)
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
    ... 59 common frames omitted
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88)
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:735)

您可以在侦听器上设置 autostartup = "false" 并自己启动它(使用 KafkaListenerEndpointRegistry - 给侦听器一个 id 以便您可以从注册表中获取对其容器的引用).

如果代理不可用,KafkaAdmin 将不会创建主题;您还需要致电 KafkaAdmin.initialize():

/**
 * Call this method to check/add topics; this might be needed if the broker was not
 * available when the application context was initialized, and
 * {@link #setFatalIfBrokerNotAvailable(boolean) fatalIfBrokerNotAvailable} is false,
 * or {@link #setAutoCreate(boolean) autoCreate} was set to false.
 * @return true if successful.
 * @see #setFatalIfBrokerNotAvailable(boolean)
 * @see #setAutoCreate(boolean)
 */
public final boolean initialize() {