spring kafka - 来自单个主题的多个消费者阅读

spring kafka - multiple consumer reading from a single topic

我正在使用 spring 启动构建一个 Web 应用程序,现在我有接收实时通知的要求。我打算为此使用 apache kafka 作为消息代理。 要求是存在具有不同角色的用户,并且根据角色,他们应该收到其他用户正在做什么的通知。
我确实设置了一个生产者和消费者,作为消费者,我可以接收发布到主题的信息,比如 topic1。
我卡住的部分是我可以让多个用户收听同一个主题,并且每个用户都应该获得发布到该主题的消息。我了解到对于这个需求,我们需要为每个kafkalistener设置不同的group.id,以便每个消费者都能得到消息。
但是,当用户登录时,我将如何创建一个具有不同组 ID 的 kafkalistener? 希望有人可以提供一些指导吗? 谢谢

只需每次创建一个新的 KafkaMessageListenerContainer,然后根据需要 start/stop。

您可以使用 Boot 的自动配置 ConcurrentKafkaListenerContainerFactory 来创建容器。只需设置 groupId 容器 属性 即可使它们独一无二。

编辑

这是一个例子:

@SpringBootApplication
public class So60150686Application {

    public static void main(String[] args) {
        SpringApplication.run(So60150686Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so60150686", "foo");
        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so60150686").partitions(1).replicas(1).build();
    }

}

@RestController
class Web {

    private final ConcurrentKafkaListenerContainerFactory<String, String> factory;

    public Web(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
        this.factory = factory;
    }

    @GetMapping(path="/foo/{group}")
    public String foo(@PathVariable String group) {
        ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("so60150686");
        container.getContainerProperties().setGroupId(group);
        container.getContainerProperties().setMessageListener(new MessageListener<String, String>() {

            @Override
            public void onMessage(ConsumerRecord<String, String> record) {
                System.out.println(record);
            }

        });
        container.start();
        return "ok";
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
$ http localhost:8080/foo/bar
HTTP/1.1 200 
Connection: keep-alive
Content-Length: 2
Content-Type: text/plain;charset=UTF-8
Date: Mon, 10 Feb 2020 19:42:02 GMT
Keep-Alive: timeout=60

ok

2020-02-10 14:42:09.744 INFO 34096 --- [ consumer-0-C-1] o.s.k.l.KafkaMessageListenerContainer : bar: partitions assigned: [so60150686-0]

ConsumerRecord(topic = so60150686, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1581363648938, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = foo)