Spring 卡夫卡监听器 |阅读相同的消息
Spring Kafka Listner | Read same messages
我有一个主题名称 "user",其中将删除用户 ID,我想阅读该主题并处理以下功能
1. process user leave data
2. process user salary data
我想让两个侦听器指向同一个主题并读取同一个用户 ID 并并行启动处理。
@KafkaListener(topics = "${kafka.topic.user}",group="abc"))
public void receive(String message) {
userService.processLeave(message);
}
@KafkaListener(topics = "${kafka.topic.user}",group="abc1"))
public void receive1(String message) {
userService.processPayRoll(message);
}
但我一直看到 -- processPayRoll 一直被调用。
我错过了什么?
嗯,看起来你使用的是旧的 Spring Kafka 版本。
很遗憾,group
与消费者的 group.id
无关。
即 containerGroup
用于生命周期管理。
您应该考虑根据不同的消费者配置来配置不同的 KafkaMessageListenerContainer
。在那里你已经配置了不同的 ConsumerConfig.GROUP_ID_CONFIG
.
最新版本的 @KafkaListener
结构如下:
/**
* If provided, the listener container for this listener will be added to a bean
* with this value as its name, of type {@code Collection<MessageListenerContainer>}.
* This allows, for example, iteration over the collection to start/stop a subset
* of containers.
* @return the bean name for the group.
*/
String containerGroup() default "";
/**
* Override the {@code group.id} property for the consumer factory with this value
* for this listener only.
* @return the group id.
* @since 1.3
*/
String groupId() default "";
/**
* When {@link #groupId() groupId} is not provided, use the {@link #id() id} (if
* provided) as the {@code group.id} property for the consumer. Set to false, to use
* the {@code group.id} from the consumer factory.
* @return false to disable.
* @since 1.3
*/
boolean idIsGroup() default true;
我有一个主题名称 "user",其中将删除用户 ID,我想阅读该主题并处理以下功能
1. process user leave data
2. process user salary data
我想让两个侦听器指向同一个主题并读取同一个用户 ID 并并行启动处理。
@KafkaListener(topics = "${kafka.topic.user}",group="abc"))
public void receive(String message) {
userService.processLeave(message);
}
@KafkaListener(topics = "${kafka.topic.user}",group="abc1"))
public void receive1(String message) {
userService.processPayRoll(message);
}
但我一直看到 -- processPayRoll 一直被调用。
我错过了什么?
嗯,看起来你使用的是旧的 Spring Kafka 版本。
很遗憾,group
与消费者的 group.id
无关。
即 containerGroup
用于生命周期管理。
您应该考虑根据不同的消费者配置来配置不同的 KafkaMessageListenerContainer
。在那里你已经配置了不同的 ConsumerConfig.GROUP_ID_CONFIG
.
最新版本的 @KafkaListener
结构如下:
/**
* If provided, the listener container for this listener will be added to a bean
* with this value as its name, of type {@code Collection<MessageListenerContainer>}.
* This allows, for example, iteration over the collection to start/stop a subset
* of containers.
* @return the bean name for the group.
*/
String containerGroup() default "";
/**
* Override the {@code group.id} property for the consumer factory with this value
* for this listener only.
* @return the group id.
* @since 1.3
*/
String groupId() default "";
/**
* When {@link #groupId() groupId} is not provided, use the {@link #id() id} (if
* provided) as the {@code group.id} property for the consumer. Set to false, to use
* the {@code group.id} from the consumer factory.
* @return false to disable.
* @since 1.3
*/
boolean idIsGroup() default true;