在 REST 端点中流式传输 Kafka 消息
Stream Kafka messages in REST endpoint
我正在尝试通过 spring-boot REST 服务和 kafka 实现一些目标。我想在收到 REST 请求时动态创建一个 kafka 主题的侦听器,听该主题一段时间,比如 10 秒,根据消息中的某些值过滤掉消息(我猜是 RecordFilterStrategy?)和将消息流式传输回 REST 消费者。 Kafka 可以做到这一点吗?
我能够动态地创建一个主题的侦听器,但不知道如何向它添加记录过滤器或如何将这些流式传输回调用者。
这就是我动态创建主题监听器的方式:
public ConcurrentMessageListenerContainer getConsumer(String topic, MessageListener<String, String> listener) {
DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(
consumerConfig);
ContainerProperties containerProperties = new ContainerProperties(topic);
containerProperties.setMessageListener(listener);
ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(
kafkaConsumerFactory,
containerProperties);
return container;
}
有人可以指点我有关如何实现此目的的任何文档吗?我创建了一个可以 return 消息通量的 GET 映射,但我不知道如何 return 有关该主题的消息。我知道以下内容不正确,但消息侦听器代码确实被命中了。
@GetMapping(path = "/v1/messages", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Message> getMessages(@RequestParam String id) {
MessageListener<String, String> listener = new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> arg0) {
logger.debug("We got a message:" + arg0);
}
};
ConcurrentMessageListenerContainer consumer = consumerFactory.getConsumer("mytopic", listener);
consumer.start();
//return messages here
}
使用您自己的 MessageListener
实现时,您必须自己进行过滤。当使用 Spring 为 @RabbitListener
方法创建的侦听器时,FilteringMessageListenerAdapter
使用 RecordFilterStrategy
。
既然你要返回 Flux,也许看看 https://projectreactor.io/docs/kafka/release/reference/
是个好主意
并使用反应式接收器和 returns 您可以映射的通量
Flux<ReceiverRecord<Integer, String>> inboundFlux =
Receiver.create(receiverOptions)
.receive();
或者查看您的代码,您可以将它们全部拉出,映射并构建通量,但这会阻塞。
我正在尝试通过 spring-boot REST 服务和 kafka 实现一些目标。我想在收到 REST 请求时动态创建一个 kafka 主题的侦听器,听该主题一段时间,比如 10 秒,根据消息中的某些值过滤掉消息(我猜是 RecordFilterStrategy?)和将消息流式传输回 REST 消费者。 Kafka 可以做到这一点吗?
我能够动态地创建一个主题的侦听器,但不知道如何向它添加记录过滤器或如何将这些流式传输回调用者。
这就是我动态创建主题监听器的方式:
public ConcurrentMessageListenerContainer getConsumer(String topic, MessageListener<String, String> listener) {
DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(
consumerConfig);
ContainerProperties containerProperties = new ContainerProperties(topic);
containerProperties.setMessageListener(listener);
ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(
kafkaConsumerFactory,
containerProperties);
return container;
}
有人可以指点我有关如何实现此目的的任何文档吗?我创建了一个可以 return 消息通量的 GET 映射,但我不知道如何 return 有关该主题的消息。我知道以下内容不正确,但消息侦听器代码确实被命中了。
@GetMapping(path = "/v1/messages", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Message> getMessages(@RequestParam String id) {
MessageListener<String, String> listener = new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> arg0) {
logger.debug("We got a message:" + arg0);
}
};
ConcurrentMessageListenerContainer consumer = consumerFactory.getConsumer("mytopic", listener);
consumer.start();
//return messages here
}
使用您自己的 MessageListener
实现时,您必须自己进行过滤。当使用 Spring 为 @RabbitListener
方法创建的侦听器时,FilteringMessageListenerAdapter
使用 RecordFilterStrategy
。
既然你要返回 Flux,也许看看 https://projectreactor.io/docs/kafka/release/reference/
是个好主意并使用反应式接收器和 returns 您可以映射的通量
Flux<ReceiverRecord<Integer, String>> inboundFlux =
Receiver.create(receiverOptions)
.receive();
或者查看您的代码,您可以将它们全部拉出,映射并构建通量,但这会阻塞。