Kafkalistener SpringBoot 故障安全
Kafkalistener SpringBoot failsafe
我 运行 spring 使用 KafkaListener 作为我的客户端启动。问题是我们如何从失败的 kafka 配置中恢复并避免应用程序以 Process finished with exit code 0
停止。
不正确配置的一个例子是,例如一个不正确的端点 url。如果无法访问 Kafka 服务器,同样的情况也适用。所以在任何情况下,KafkaListner 进程都不应该杀死服务器。
@Bean
open fun consumerFactory(): ConsumerFactory<String, String> {
val deserializer = JsonDeserializer<Thing>()
deserializer.addTrustedPackages("de.data.Thing")
val props: MutableMap<String, Any> = HashMap()
val serverUrl = server.substringBefore(":")
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = server
props[ConsumerConfig.GROUP_ID_CONFIG] = "group"
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = "SASL_SSL"
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = false
props[SaslConfigs.SASL_MECHANISM] = "PLAIN"
props[SaslConfigs.SASL_JAAS_CONFIG] = "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"$ConnectionString\" " +
"password=\"Endpoint=sb://$serverUrl/;" +
"SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=$sharedSecret\";"
return DefaultKafkaConsumerFactory(props,
StringDeserializer(), StringDeserializer())
}
@Bean
open fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String>? {
val factory: ConcurrentKafkaListenerContainerFactory<String, String> = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = consumerFactory()
factory.setMessageConverter(BytesJsonMessageConverter())
return factory
}
@KafkaListener(topics = ["topic"],
groupId = "group",
containerFactory = "kafkaListenerContainerFactory",
)
fun listenThingsChanged(@Payload thing: Thing,
record: ConsumerRecord<String, String>,
@Headers headers: MessageHeaders) {
....
}
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185)
at org.springframework.context.support.DefaultLifecycleProcessor.access0(DefaultLifecycleProcessor.java:53)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:895)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:554)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
at de.x.ServerAppKt.main(ServerApp.kt:11)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:631)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:340)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:308)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:293)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:267)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:241)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.(KafkaMessageListenerContainer.java:606)
at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:302)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338)
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:204)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338)
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312)
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
... 19 common frames omitted
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:89)
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:737)
... 33 common frames omitted
如果代理刚刚关闭,应用程序将正常启动(对于早于 2.3.4 的版本,您必须在容器属性上将 missingTopicsFatal
设置为 false(此后默认为 false) .
No resolvable bootstrap urls given in...
这是致命的 - 它是无法恢复的。
但是,您可以设置 autoStartup=false
- 在 @KafkaListener
或容器工厂上。
这将防止 Spring 在应用程序初始化期间尝试启动容器。
然后您可以在 try/catch 块中自己启动容器。
我 运行 spring 使用 KafkaListener 作为我的客户端启动。问题是我们如何从失败的 kafka 配置中恢复并避免应用程序以 Process finished with exit code 0
停止。
不正确配置的一个例子是,例如一个不正确的端点 url。如果无法访问 Kafka 服务器,同样的情况也适用。所以在任何情况下,KafkaListner 进程都不应该杀死服务器。
@Bean
open fun consumerFactory(): ConsumerFactory<String, String> {
val deserializer = JsonDeserializer<Thing>()
deserializer.addTrustedPackages("de.data.Thing")
val props: MutableMap<String, Any> = HashMap()
val serverUrl = server.substringBefore(":")
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = server
props[ConsumerConfig.GROUP_ID_CONFIG] = "group"
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = "SASL_SSL"
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = false
props[SaslConfigs.SASL_MECHANISM] = "PLAIN"
props[SaslConfigs.SASL_JAAS_CONFIG] = "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"$ConnectionString\" " +
"password=\"Endpoint=sb://$serverUrl/;" +
"SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=$sharedSecret\";"
return DefaultKafkaConsumerFactory(props,
StringDeserializer(), StringDeserializer())
}
@Bean
open fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String>? {
val factory: ConcurrentKafkaListenerContainerFactory<String, String> = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = consumerFactory()
factory.setMessageConverter(BytesJsonMessageConverter())
return factory
}
@KafkaListener(topics = ["topic"],
groupId = "group",
containerFactory = "kafkaListenerContainerFactory",
)
fun listenThingsChanged(@Payload thing: Thing,
record: ConsumerRecord<String, String>,
@Headers headers: MessageHeaders) {
....
}
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) at org.springframework.context.support.DefaultLifecycleProcessor.access0(DefaultLifecycleProcessor.java:53) at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:895) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:554) at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143) at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) at de.x.ServerAppKt.main(ServerApp.kt:11) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:631) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:340) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:308) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:293) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:267) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:241) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.(KafkaMessageListenerContainer.java:606) at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:302) at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338) at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:204) at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338) at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312) at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257) at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ... 19 common frames omitted Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:89) at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:737) ... 33 common frames omitted
如果代理刚刚关闭,应用程序将正常启动(对于早于 2.3.4 的版本,您必须在容器属性上将 missingTopicsFatal
设置为 false(此后默认为 false) .
No resolvable bootstrap urls given in...
这是致命的 - 它是无法恢复的。
但是,您可以设置 autoStartup=false
- 在 @KafkaListener
或容器工厂上。
这将防止 Spring 在应用程序初始化期间尝试启动容器。
然后您可以在 try/catch 块中自己启动容器。