spring kafka - 来自单个主题的多个消费者阅读
spring kafka - multiple consumer reading from a single topic
我正在使用 spring 启动构建一个 Web 应用程序,现在我有接收实时通知的要求。我打算为此使用 apache kafka 作为消息代理。
要求是存在具有不同角色的用户,并且根据角色,他们应该收到其他用户正在做什么的通知。
我确实设置了一个生产者和消费者,作为消费者,我可以接收发布到主题的信息,比如 topic1。
我卡住的部分是我可以让多个用户收听同一个主题,并且每个用户都应该获得发布到该主题的消息。我了解到对于这个需求,我们需要为每个kafkalistener设置不同的group.id,以便每个消费者都能得到消息。
但是,当用户登录时,我将如何创建一个具有不同组 ID 的 kafkalistener?
希望有人可以提供一些指导吗?
谢谢
只需每次创建一个新的 KafkaMessageListenerContainer
,然后根据需要 start/stop。
您可以使用 Boot 的自动配置 ConcurrentKafkaListenerContainerFactory
来创建容器。只需设置 groupId
容器 属性 即可使它们独一无二。
编辑
这是一个例子:
@SpringBootApplication
public class So60150686Application {
public static void main(String[] args) {
SpringApplication.run(So60150686Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so60150686", "foo");
};
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so60150686").partitions(1).replicas(1).build();
}
}
@RestController
class Web {
private final ConcurrentKafkaListenerContainerFactory<String, String> factory;
public Web(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
this.factory = factory;
}
@GetMapping(path="/foo/{group}")
public String foo(@PathVariable String group) {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("so60150686");
container.getContainerProperties().setGroupId(group);
container.getContainerProperties().setMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println(record);
}
});
container.start();
return "ok";
}
}
spring.kafka.consumer.auto-offset-reset=earliest
$ http localhost:8080/foo/bar
HTTP/1.1 200
Connection: keep-alive
Content-Length: 2
Content-Type: text/plain;charset=UTF-8
Date: Mon, 10 Feb 2020 19:42:02 GMT
Keep-Alive: timeout=60
ok
2020-02-10 14:42:09.744 INFO 34096 --- [ consumer-0-C-1] o.s.k.l.KafkaMessageListenerContainer : bar: partitions assigned: [so60150686-0]
ConsumerRecord(topic = so60150686, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1581363648938, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = foo)
我正在使用 spring 启动构建一个 Web 应用程序,现在我有接收实时通知的要求。我打算为此使用 apache kafka 作为消息代理。
要求是存在具有不同角色的用户,并且根据角色,他们应该收到其他用户正在做什么的通知。
我确实设置了一个生产者和消费者,作为消费者,我可以接收发布到主题的信息,比如 topic1。
我卡住的部分是我可以让多个用户收听同一个主题,并且每个用户都应该获得发布到该主题的消息。我了解到对于这个需求,我们需要为每个kafkalistener设置不同的group.id,以便每个消费者都能得到消息。
但是,当用户登录时,我将如何创建一个具有不同组 ID 的 kafkalistener?
希望有人可以提供一些指导吗?
谢谢
只需每次创建一个新的 KafkaMessageListenerContainer
,然后根据需要 start/stop。
您可以使用 Boot 的自动配置 ConcurrentKafkaListenerContainerFactory
来创建容器。只需设置 groupId
容器 属性 即可使它们独一无二。
编辑
这是一个例子:
@SpringBootApplication
public class So60150686Application {
public static void main(String[] args) {
SpringApplication.run(So60150686Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so60150686", "foo");
};
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so60150686").partitions(1).replicas(1).build();
}
}
@RestController
class Web {
private final ConcurrentKafkaListenerContainerFactory<String, String> factory;
public Web(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
this.factory = factory;
}
@GetMapping(path="/foo/{group}")
public String foo(@PathVariable String group) {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("so60150686");
container.getContainerProperties().setGroupId(group);
container.getContainerProperties().setMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println(record);
}
});
container.start();
return "ok";
}
}
spring.kafka.consumer.auto-offset-reset=earliest
$ http localhost:8080/foo/bar
HTTP/1.1 200
Connection: keep-alive
Content-Length: 2
Content-Type: text/plain;charset=UTF-8
Date: Mon, 10 Feb 2020 19:42:02 GMT
Keep-Alive: timeout=60
ok
2020-02-10 14:42:09.744 INFO 34096 --- [ consumer-0-C-1] o.s.k.l.KafkaMessageListenerContainer : bar: partitions assigned: [so60150686-0]
ConsumerRecord(topic = so60150686, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1581363648938, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = foo)