Spring WebFlux 与 Kafka 和 Websockets

Spring WebFlux with Kafka and Websockets

现在我在我的 SpringBoot 应用程序中实现了一个简单的 Kafka 消费者和生产者,它工作正常我接下来要做的是我的消费者获取消费的消息并将其直接广播给所有订阅的客户端。我发现我不能将 STOMP Messaging 与 WebFlux 一起使用,所以我该如何完成这个任务,我看到了反应式 WebSocket 实现,但我没有弄清楚如何将我使用的数据发送到我的 websocket。

这就是我的简单 KafkaProducer:

fun addMessage(message: Message){
        val headers : MutableMap<String, Any> = HashMap()
        headers[KafkaHeaders.TOPIC] = topicName
        kafkaTemplate.send(GenericMessage<Message>(message, headers))
    }

我的简单消费者看起来像这样:

@KafkaListener(topics = ["mytopic"], groupId = "test-consumer-group")
    fun receiveData(message:Message) :Message{
        //Take consumed data and send to websocket
    }

我会考虑使用 Sinks.many().multicast().onBackpressureBuffer() 作为全局中间容器。然后在您的 receiveData() 中,您只需 将数据吸收 到 Reactor 抽象中。

对于您的 WebSocket 连接会话,我建议实施 org.springframework.web.reactive.socket.WebSocketHandler 并在 WebSocketSession.send(Publisher<WebSocketMessage> messages) API 中使用 Sinks.Many.asFlux()。这样,只要连接到此 WebSocket 服务器,所有会话都将使用相同的 Kafka 数据。

在文档中查看更多信息:https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#webflux-websockethandler

更新

您可以在这里找到一些示例:https://github.com/artembilan/sandbox/tree/master/so-65667450