@KafkaListener 启动问题 (Spring)
Problem with start of @KafkaListener (Spring)
需要什么
我正在编写一个使用 Kafka 获取信息的应用程序 (Spring + Kotlin)。如果我在声明 @KafkaListener 时设置 autoStartup = "true" 那么应用程序可以正常工作,但前提是代理可用。当代理不可用时,应用程序在启动时崩溃。这是不受欢迎的行为。该应用程序必须运行并执行其他功能。
我尝试做的事情
为了避免在另一个主题中启动此站点上的某个人时应用程序崩溃,建议在声明 @KafkaListener[=41 时设置 autoStartup = "false" =].它确实有助于防止启动时崩溃。但是现在我无法成功手动启动 KafkaListener。在其他示例中,我看到 KafkaListenerEndpointRegistry 的自动连接,但是当我尝试这样做时:
@Service
class KafkaConsumer @Autowired constructor(
private val kafkaListenerEndpointRegistry: KafkaListenerEndpointRegistry
) {
IntelliJ Idea 警告:
Could not autowire. No beans of 'KafkaListenerEndpointRegistry' type found.
当我尝试在没有自动装配的情况下使用 KafkaListenerEndpointRegistry 并执行此代码时:
@Service
class KafkaConsumer {
private val logger = LoggerFactory.getLogger(this::class.java)
private val kafkaListenerEndpointRegistry = KafkaListenerEndpointRegistry()
@Scheduled(fixedDelay = 10000)
fun startCpguListener(){
val container = kafkaListenerEndpointRegistry.getListenerContainer("consumer1")
if (!container.isRunning)
try {
logger.info("Kafka Consumer is not running. Trying to start...")
container.start()
} catch (e: Exception){
logger.error(e.message)
}
}
@KafkaListener(
id = "consumer1",
topics = ["cpgdb.public.user"],
autoStartup = "false"
)
private fun listen(it: ConsumerRecord<JsonNode, JsonNode>, qwe: Consumer<Any, Any>){
val pay = it.value().get("payload")
val after = pay.get("after")
val id = after["id"].asInt()
val receivedUser = CpguUser(
id = id,
name = after["name"].asText()
)
logger.info("received user with id = $id")
}
}
}
kafkaListenerEndpointRegistry.getListenerContainer("consumer1")
始终 return 为空。我想这是因为我没有自动连接 kafkaListenerEndpointRegistry。我该怎么做?或者,如果存在我的答案的另一种解决方案,我将不胜感激!谢谢!
有Kafka配置:
@Configuration
@EnableConfigurationProperties(KafkaProperties::class)
class KafkaConfiguration(private val props: KafkaProperties) {
@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<Any, Any> {
val factory = ConcurrentKafkaListenerContainerFactory<Any, Any>()
factory.consumerFactory = consumerFactory()
factory.setConcurrency(1)
factory.setMessageConverter(MessagingMessageConverter())
factory.setStatefulRetry(true)
val retryTemplate = RetryTemplate()
retryTemplate.setRetryPolicy(AlwaysRetryPolicy())
retryTemplate.setBackOffPolicy(ExponentialBackOffPolicy())
factory.setRetryTemplate(retryTemplate)
val handler = SeekToCurrentErrorHandler()
handler.isAckAfterHandle = false
factory.setErrorHandler(handler)
factory.containerProperties.isMissingTopicsFatal = false
return factory
}
@Bean
fun consumerFactory(): ConsumerFactory<Any, Any> {
return DefaultKafkaConsumerFactory(consumerConfigs())
}
@Bean
fun consumerConfigs(): Map<String, Any> {
return mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to props.bootstrap.address,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG to listOf(MonitoringConsumerInterceptor::class.java),
ConsumerConfig.CLIENT_ID_CONFIG to props.receiver.clientId,
ConsumerConfig.GROUP_ID_CONFIG to props.receiver.groupId,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
ConsumerConfig.ISOLATION_LEVEL_CONFIG to "read_committed",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true
)
}
}
- spring开机版本:2.3.0
- spring-kafka版本:2.5.3
- kafka客户端版本:2.5.0
忽略IntelliJ关于自动接线的警告;该bean 确实存在;只是IntelliJ检测不到而已
需要什么
我正在编写一个使用 Kafka 获取信息的应用程序 (Spring + Kotlin)。如果我在声明 @KafkaListener 时设置 autoStartup = "true" 那么应用程序可以正常工作,但前提是代理可用。当代理不可用时,应用程序在启动时崩溃。这是不受欢迎的行为。该应用程序必须运行并执行其他功能。
我尝试做的事情
为了避免在另一个主题中启动此站点上的某个人时应用程序崩溃,建议在声明 @KafkaListener[=41 时设置 autoStartup = "false" =].它确实有助于防止启动时崩溃。但是现在我无法成功手动启动 KafkaListener。在其他示例中,我看到 KafkaListenerEndpointRegistry 的自动连接,但是当我尝试这样做时:
@Service
class KafkaConsumer @Autowired constructor(
private val kafkaListenerEndpointRegistry: KafkaListenerEndpointRegistry
) {
IntelliJ Idea 警告:
Could not autowire. No beans of 'KafkaListenerEndpointRegistry' type found.
当我尝试在没有自动装配的情况下使用 KafkaListenerEndpointRegistry 并执行此代码时:
@Service
class KafkaConsumer {
private val logger = LoggerFactory.getLogger(this::class.java)
private val kafkaListenerEndpointRegistry = KafkaListenerEndpointRegistry()
@Scheduled(fixedDelay = 10000)
fun startCpguListener(){
val container = kafkaListenerEndpointRegistry.getListenerContainer("consumer1")
if (!container.isRunning)
try {
logger.info("Kafka Consumer is not running. Trying to start...")
container.start()
} catch (e: Exception){
logger.error(e.message)
}
}
@KafkaListener(
id = "consumer1",
topics = ["cpgdb.public.user"],
autoStartup = "false"
)
private fun listen(it: ConsumerRecord<JsonNode, JsonNode>, qwe: Consumer<Any, Any>){
val pay = it.value().get("payload")
val after = pay.get("after")
val id = after["id"].asInt()
val receivedUser = CpguUser(
id = id,
name = after["name"].asText()
)
logger.info("received user with id = $id")
}
}
}
kafkaListenerEndpointRegistry.getListenerContainer("consumer1")
始终 return 为空。我想这是因为我没有自动连接 kafkaListenerEndpointRegistry。我该怎么做?或者,如果存在我的答案的另一种解决方案,我将不胜感激!谢谢!
有Kafka配置:
@Configuration
@EnableConfigurationProperties(KafkaProperties::class)
class KafkaConfiguration(private val props: KafkaProperties) {
@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<Any, Any> {
val factory = ConcurrentKafkaListenerContainerFactory<Any, Any>()
factory.consumerFactory = consumerFactory()
factory.setConcurrency(1)
factory.setMessageConverter(MessagingMessageConverter())
factory.setStatefulRetry(true)
val retryTemplate = RetryTemplate()
retryTemplate.setRetryPolicy(AlwaysRetryPolicy())
retryTemplate.setBackOffPolicy(ExponentialBackOffPolicy())
factory.setRetryTemplate(retryTemplate)
val handler = SeekToCurrentErrorHandler()
handler.isAckAfterHandle = false
factory.setErrorHandler(handler)
factory.containerProperties.isMissingTopicsFatal = false
return factory
}
@Bean
fun consumerFactory(): ConsumerFactory<Any, Any> {
return DefaultKafkaConsumerFactory(consumerConfigs())
}
@Bean
fun consumerConfigs(): Map<String, Any> {
return mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to props.bootstrap.address,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG to listOf(MonitoringConsumerInterceptor::class.java),
ConsumerConfig.CLIENT_ID_CONFIG to props.receiver.clientId,
ConsumerConfig.GROUP_ID_CONFIG to props.receiver.groupId,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
ConsumerConfig.ISOLATION_LEVEL_CONFIG to "read_committed",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true
)
}
}
- spring开机版本:2.3.0
- spring-kafka版本:2.5.3
- kafka客户端版本:2.5.0
忽略IntelliJ关于自动接线的警告;该bean 确实存在;只是IntelliJ检测不到而已