Spring 卡夫卡监听器 |阅读相同的消息

Spring Kafka Listner | Read same messages

我有一个主题名称 "user",其中将删除用户 ID,我想阅读该主题并处理以下功能

1. process user leave data
2. process user salary data

我想让两个侦听器指向同一个主题并读取同一个用户 ID 并并行启动处理。

@KafkaListener(topics = "${kafka.topic.user}",group="abc"))
            public void receive(String message) {

                    userService.processLeave(message);
                }

@KafkaListener(topics = "${kafka.topic.user}",group="abc1"))
            public void receive1(String message) {

                    userService.processPayRoll(message);
                }

但我一直看到 -- processPayRoll 一直被调用。

我错过了什么?

嗯,看起来你使用的是旧的 Spring Kafka 版本。

很遗憾,group 与消费者的 group.id 无关。 即 containerGroup 用于生命周期管理。

您应该考虑根据不同的消费者配置来配置不同的 KafkaMessageListenerContainer。在那里你已经配置了不同的 ConsumerConfig.GROUP_ID_CONFIG.

最新版本的 @KafkaListener 结构如下:

/**
 * If provided, the listener container for this listener will be added to a bean
 * with this value as its name, of type {@code Collection<MessageListenerContainer>}.
 * This allows, for example, iteration over the collection to start/stop a subset
 * of containers.
 * @return the bean name for the group.
 */
String containerGroup() default "";

/**
 * Override the {@code group.id} property for the consumer factory with this value
 * for this listener only.
 * @return the group id.
 * @since 1.3
 */
String groupId() default "";

/**
 * When {@link #groupId() groupId} is not provided, use the {@link #id() id} (if
 * provided) as the {@code group.id} property for the consumer. Set to false, to use
 * the {@code group.id} from the consumer factory.
 * @return false to disable.
 * @since 1.3
 */
boolean idIsGroup() default true;