如何告诉 RSocket 通过阻塞队列支持的 Java 8 Stream 读取数据流

How to tell RSocket to read data stream by Java 8 Stream which backed by Blocking queue

我有以下场景,我的程序使用阻塞队列异步处理消息。有多个 RSocket 客户端希望接收此消息。我的设计是这样的,当消息到达阻塞队列时,绑定到 Flux 的流将发出。我已尝试按如下方式实现此要求,但客户端没有收到任何响应。但是,我可以看到 Stream 供应商被正确触发。

有人可以帮忙吗。

  @MessageMapping("addListenerHook")
public Flux<QueryResult> addListenerHook(String clientName){
    System.out.println("Adding Listener:"+clientName);
    BlockingQueue<QueryResult> listenerQ = new LinkedBlockingQueue<>();
    Datalistener.register(clientName,listenerQ);

    return Flux.fromStream(
            ()-> Stream.generate(()->streamValue(listenerQ))).map(q->{
        System.out.println("I got an event : "+q.getResult());
        return q;
    });
}


private QueryResult streamValue(BlockingQueue<QueryResult> inStream){
    try{
        return inStream.take();
    }catch(Exception e){
        return null;
    }
}

由于阻塞API,这很难简单干净地解决。我认为这就是为什么这里没有简单的桥 API 来帮助您实现它。您应该想出一个干净的解决方案,首先将 BlockingQueue 变成 Flux。然后 spring-boot 部分就变成了一个非事件。

这就是为什么正确的解决方案可能涉及自定义 BlockingQueue 实现,例如 https://www.nurkiewicz.com/2015/07/consuming-javautilconcurrentblockingque.html

中的 ObservableQueue

另一种方法是

如果您需要保留 LinkedBlockingQueue,起始解决方案可能如下所示。

  val f = flux<String> {
    val listenerQ = LinkedBlockingQueue<QueryResult>()
    Datalistener.register(clientName,listenerQ);
    
    while (true) {
      send(bq.take())
    }
  }.subscribeOn(Schedulers.elastic())

使用 API 之类的通量,您绝对应该在订阅之前避免任何副作用,因此在方法主体内部之前不要注册您的侦听器。但是您需要改进此示例以处理取消,或者您取消侦听器并中断执行 take 的线程。