不调用 Kafka 侦听器方法。消费者不消费。
Kafka Listener Method is not invoked. Consumer not consuming.
前面提到的应该从单个主题消费的 kafka 消费者。我无法使用 spring 启动,因为我正在将 kafka 消费者 api 与 spring 核心网络应用程序集成..
springxml配置如下
<bean id="kafkaConsumerProperties" class="com.azuga.kafka.listeners.KafkaConsumerProperties">
<constructor-arg type="java.lang.String" value="127.0.0.1:9092" />
<constructor-arg type="java.lang.String" value="tdm-group" />
<constructor-arg type="java.lang.String" value="dbStreamer.azuga.tripDriverMapping" />
</bean>
<bean id="kafkaListenerConfig" class="com.azuga.kafka.listeners.KafkaListenerConfig">
<property name="kafkaConsumerProperties" ref="kafkaConsumerProperties" />
</bean>
<bean id="kafkaContainerFactory" class="com.azuga.kafka.listeners.KafkaListenerContainerFactory"
factory-method="kafkaContainerFactory">
</bean>
这是创建 ListenerContainerFactory
的 class
@EnableKafka
public class KafkaListenerContainerFactory {
public static ConcurrentKafkaListenerContainerFactory<String, String> kafkaContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(1);
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@SuppressWarnings("unchecked")
public static ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(KafkaListenerConfig.consumerProps(),
KafkaListenerConfig.stringKeyDeserializer(), KafkaListenerConfig.stringKeyDeserializer());
}
}
这是我的监听器Class,带有@KafkaListener
注释
package com.azuga.kafka.listeners;
import org.springframework.kafka.annotation.KafkaListener;
public class Listener {
@KafkaListener(topics = "dbStreamer.azuga.tripDriverMapping")
public void onMessage(String message) {
System.out.println(message.toString());
}
}
这是 KafkaListenerConfig class,它接受 bootstrap 服务器、主题名称等
@EnableKafka
public class KafkaListenerConfig {
private static KafkaConsumerProperties kafkaConsumerProperties;
public void setKafkaConsumerProperties(KafkaConsumerProperties kafkaConsumerProperties) {
this.kafkaConsumerProperties = kafkaConsumerProperties;
}
public static Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConsumerProperties.getBootstrap());
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerProperties.getGroup());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
return props;
}
public static Deserializer stringKeyDeserializer() {
return new StringDeserializer();
}
}
您的应用程序配置有点不寻常。
但是我假设您忽略了 @EnableKafka
与 @Configuration
class 相关的事实。因此,根据 Spring 框架文档,您必须使用 AnnotationConfigWebApplicationContext
class:
* {@link org.springframework.web.context.WebApplicationContext WebApplicationContext}
* implementation which accepts annotated classes as input - in particular
* {@link org.springframework.context.annotation.Configuration @Configuration}-annotated
* classes, but also plain {@link org.springframework.stereotype.Component @Component}
* classes and JSR-330 compliant classes using {@code javax.inject} annotations. Allows
* for registering classes one by one (specifying class names as config location) as well
* as for classpath scanning (specifying base packages as config location).
不幸的是,这不适用于简单的 XML 配置。
Spring Kafka 没有为 XML 定义提供任何钩子。
前面提到的应该从单个主题消费的 kafka 消费者。我无法使用 spring 启动,因为我正在将 kafka 消费者 api 与 spring 核心网络应用程序集成..
springxml配置如下
<bean id="kafkaConsumerProperties" class="com.azuga.kafka.listeners.KafkaConsumerProperties">
<constructor-arg type="java.lang.String" value="127.0.0.1:9092" />
<constructor-arg type="java.lang.String" value="tdm-group" />
<constructor-arg type="java.lang.String" value="dbStreamer.azuga.tripDriverMapping" />
</bean>
<bean id="kafkaListenerConfig" class="com.azuga.kafka.listeners.KafkaListenerConfig">
<property name="kafkaConsumerProperties" ref="kafkaConsumerProperties" />
</bean>
<bean id="kafkaContainerFactory" class="com.azuga.kafka.listeners.KafkaListenerContainerFactory"
factory-method="kafkaContainerFactory">
</bean>
这是创建 ListenerContainerFactory
的 class@EnableKafka
public class KafkaListenerContainerFactory {
public static ConcurrentKafkaListenerContainerFactory<String, String> kafkaContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(1);
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@SuppressWarnings("unchecked")
public static ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(KafkaListenerConfig.consumerProps(),
KafkaListenerConfig.stringKeyDeserializer(), KafkaListenerConfig.stringKeyDeserializer());
}
}
这是我的监听器Class,带有@KafkaListener
注释package com.azuga.kafka.listeners;
import org.springframework.kafka.annotation.KafkaListener;
public class Listener {
@KafkaListener(topics = "dbStreamer.azuga.tripDriverMapping")
public void onMessage(String message) {
System.out.println(message.toString());
}
}
这是 KafkaListenerConfig class,它接受 bootstrap 服务器、主题名称等
@EnableKafka
public class KafkaListenerConfig {
private static KafkaConsumerProperties kafkaConsumerProperties;
public void setKafkaConsumerProperties(KafkaConsumerProperties kafkaConsumerProperties) {
this.kafkaConsumerProperties = kafkaConsumerProperties;
}
public static Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConsumerProperties.getBootstrap());
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerProperties.getGroup());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
return props;
}
public static Deserializer stringKeyDeserializer() {
return new StringDeserializer();
}
}
您的应用程序配置有点不寻常。
但是我假设您忽略了 @EnableKafka
与 @Configuration
class 相关的事实。因此,根据 Spring 框架文档,您必须使用 AnnotationConfigWebApplicationContext
class:
* {@link org.springframework.web.context.WebApplicationContext WebApplicationContext}
* implementation which accepts annotated classes as input - in particular
* {@link org.springframework.context.annotation.Configuration @Configuration}-annotated
* classes, but also plain {@link org.springframework.stereotype.Component @Component}
* classes and JSR-330 compliant classes using {@code javax.inject} annotations. Allows
* for registering classes one by one (specifying class names as config location) as well
* as for classpath scanning (specifying base packages as config location).
不幸的是,这不适用于简单的 XML 配置。
Spring Kafka 没有为 XML 定义提供任何钩子。