我们可以为不同的主题使用相同的容器吗?
can we use same container for different topics?
我有两个不同的 Kafka 主题。两个主题中的数据类型和数据结构 (Json) 相同。
我可以为两个主题使用相同的侦听器容器吗?
我尝试使用并且能够从两个主题中获取输出。
以下是我的配置。
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer factoryConfigure,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factoryConfigure.configure(factory, kafkaConsumerFactory);
factory.setBatchListener(true);
return factory;
}
这是我的听众。
@KafkaListener(id = "#{'${spring.kafka.listener.id}'}", topics = "#{'${spring.kafka.consumer.topic}'}", groupId = "#{'${spring.kafka.consumer.group-id}'}")
public void getTopics( List<Request> model) {
System.out.println("I am from topic1");
System.out.println(model);
}
@KafkaListener(id = "#{'${spring.kafka.listener.id2}'}", topics = "#{'${spring.kafka.consumer.topic2}'}", groupId = "#{'${spring.kafka.consumer.group-id2}'}")
public void getTopics( List<Request> model) {
System.out.println("I am from topic2");
System.out.println(model);
}
我没有明确使用任何注释来提供容器名称。
这会导致我遇到任何问题吗?
不清楚你问的是什么;容器工厂将为每个侦听器创建一个不同的容器。
topics
属性 是一个数组所以你可以使用
@KafkaListener(id = "#{'${spring.kafka.listener.id}'}",
topics = { "${spring.kafka.consumer.topic}",
"${spring.kafka.consumer.topic2}" },
groupId = "#{'${spring.kafka.consumer.group-id}'}")
public void getTopics( List<Request> model) {
System.out.println("I am from both topics");
System.out.println(model);
}
如果您需要知道每条记录来自哪个主题,可以使用
public void getTopics( List<Request> model,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics) {
索引 n 的主题用于模型索引 n 的记录。
我有两个不同的 Kafka 主题。两个主题中的数据类型和数据结构 (Json) 相同。 我可以为两个主题使用相同的侦听器容器吗? 我尝试使用并且能够从两个主题中获取输出。
以下是我的配置。
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer factoryConfigure,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factoryConfigure.configure(factory, kafkaConsumerFactory);
factory.setBatchListener(true);
return factory;
}
这是我的听众。
@KafkaListener(id = "#{'${spring.kafka.listener.id}'}", topics = "#{'${spring.kafka.consumer.topic}'}", groupId = "#{'${spring.kafka.consumer.group-id}'}")
public void getTopics( List<Request> model) {
System.out.println("I am from topic1");
System.out.println(model);
}
@KafkaListener(id = "#{'${spring.kafka.listener.id2}'}", topics = "#{'${spring.kafka.consumer.topic2}'}", groupId = "#{'${spring.kafka.consumer.group-id2}'}")
public void getTopics( List<Request> model) {
System.out.println("I am from topic2");
System.out.println(model);
}
我没有明确使用任何注释来提供容器名称。 这会导致我遇到任何问题吗?
不清楚你问的是什么;容器工厂将为每个侦听器创建一个不同的容器。
topics
属性 是一个数组所以你可以使用
@KafkaListener(id = "#{'${spring.kafka.listener.id}'}",
topics = { "${spring.kafka.consumer.topic}",
"${spring.kafka.consumer.topic2}" },
groupId = "#{'${spring.kafka.consumer.group-id}'}")
public void getTopics( List<Request> model) {
System.out.println("I am from both topics");
System.out.println(model);
}
如果您需要知道每条记录来自哪个主题,可以使用
public void getTopics( List<Request> model,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics) {
索引 n 的主题用于模型索引 n 的记录。