Kafkalistener SpringBoot 故障安全

Kafkalistener SpringBoot failsafe

我 运行 spring 使用 KafkaListener 作为我的客户端启动。问题是我们如何从失败的 kafka 配置中恢复并避免应用程序以 Process finished with exit code 0 停止。 不正确配置的一个例子是,例如一个不正确的端点 url。如果无法访问 Kafka 服务器,同样的情况也适用。所以在任何情况下,KafkaListner 进程都不应该杀死服务器。

 @Bean
open fun consumerFactory(): ConsumerFactory<String, String> {
    val deserializer = JsonDeserializer<Thing>()
    deserializer.addTrustedPackages("de.data.Thing")

    val props: MutableMap<String, Any> = HashMap()
    val serverUrl = server.substringBefore(":")
    props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = server
    props[ConsumerConfig.GROUP_ID_CONFIG] = "group"
    props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
    props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
    props[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = "SASL_SSL"
    props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
    props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = false
    props[SaslConfigs.SASL_MECHANISM] = "PLAIN"
    props[SaslConfigs.SASL_JAAS_CONFIG] = "org.apache.kafka.common.security.plain.PlainLoginModule required " +
            "username=\"$ConnectionString\" " +
            "password=\"Endpoint=sb://$serverUrl/;" +
            "SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=$sharedSecret\";"
    return DefaultKafkaConsumerFactory(props,
            StringDeserializer(), StringDeserializer())

}


@Bean
open fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String>? {
    val factory: ConcurrentKafkaListenerContainerFactory<String, String> = ConcurrentKafkaListenerContainerFactory<String, String>()
    factory.consumerFactory = consumerFactory()
    factory.setMessageConverter(BytesJsonMessageConverter())
    return factory
}

 @KafkaListener(topics = ["topic"],
        groupId = "group",
        containerFactory = "kafkaListenerContainerFactory",
)
fun listenThingsChanged(@Payload thing: Thing,
                        record: ConsumerRecord<String, String>,
                        @Headers headers: MessageHeaders) {

    ....
}

 

org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) at org.springframework.context.support.DefaultLifecycleProcessor.access0(DefaultLifecycleProcessor.java:53) at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:895) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:554) at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143) at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) at de.x.ServerAppKt.main(ServerApp.kt:11) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:631) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:340) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:308) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:293) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:267) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:241) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.(KafkaMessageListenerContainer.java:606) at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:302) at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338) at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:204) at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338) at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312) at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257) at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ... 19 common frames omitted Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:89) at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:737) ... 33 common frames omitted

如果代理刚刚关闭,应用程序将正常启动(对于早于 2.3.4 的版本,您必须在容器属性上将 missingTopicsFatal 设置为 false(此后默认为 false) .

No resolvable bootstrap urls given in...

这是致命的 - 它是无法恢复的。

但是,您可以设置 autoStartup=false - 在 @KafkaListener 或容器工厂上。

这将防止 Spring 在应用程序初始化期间尝试启动容器。

然后您可以在 try/catch 块中自己启动容器。