如何告诉 RSocket 通过阻塞队列支持的 Java 8 Stream 读取数据流
How to tell RSocket to read data stream by Java 8 Stream which backed by Blocking queue
我有以下场景,我的程序使用阻塞队列异步处理消息。有多个 RSocket 客户端希望接收此消息。我的设计是这样的,当消息到达阻塞队列时,绑定到 Flux 的流将发出。我已尝试按如下方式实现此要求,但客户端没有收到任何响应。但是,我可以看到 Stream 供应商被正确触发。
有人可以帮忙吗。
@MessageMapping("addListenerHook")
public Flux<QueryResult> addListenerHook(String clientName){
System.out.println("Adding Listener:"+clientName);
BlockingQueue<QueryResult> listenerQ = new LinkedBlockingQueue<>();
Datalistener.register(clientName,listenerQ);
return Flux.fromStream(
()-> Stream.generate(()->streamValue(listenerQ))).map(q->{
System.out.println("I got an event : "+q.getResult());
return q;
});
}
private QueryResult streamValue(BlockingQueue<QueryResult> inStream){
try{
return inStream.take();
}catch(Exception e){
return null;
}
}
由于阻塞API,这很难简单干净地解决。我认为这就是为什么这里没有简单的桥 API 来帮助您实现它。您应该想出一个干净的解决方案,首先将 BlockingQueue 变成 Flux。然后 spring-boot 部分就变成了一个非事件。
这就是为什么正确的解决方案可能涉及自定义 BlockingQueue 实现,例如 https://www.nurkiewicz.com/2015/07/consuming-javautilconcurrentblockingque.html
中的 ObservableQueue
另一种方法是
如果您需要保留 LinkedBlockingQueue,起始解决方案可能如下所示。
val f = flux<String> {
val listenerQ = LinkedBlockingQueue<QueryResult>()
Datalistener.register(clientName,listenerQ);
while (true) {
send(bq.take())
}
}.subscribeOn(Schedulers.elastic())
使用 API 之类的通量,您绝对应该在订阅之前避免任何副作用,因此在方法主体内部之前不要注册您的侦听器。但是您需要改进此示例以处理取消,或者您取消侦听器并中断执行 take 的线程。
我有以下场景,我的程序使用阻塞队列异步处理消息。有多个 RSocket 客户端希望接收此消息。我的设计是这样的,当消息到达阻塞队列时,绑定到 Flux 的流将发出。我已尝试按如下方式实现此要求,但客户端没有收到任何响应。但是,我可以看到 Stream 供应商被正确触发。
有人可以帮忙吗。
@MessageMapping("addListenerHook")
public Flux<QueryResult> addListenerHook(String clientName){
System.out.println("Adding Listener:"+clientName);
BlockingQueue<QueryResult> listenerQ = new LinkedBlockingQueue<>();
Datalistener.register(clientName,listenerQ);
return Flux.fromStream(
()-> Stream.generate(()->streamValue(listenerQ))).map(q->{
System.out.println("I got an event : "+q.getResult());
return q;
});
}
private QueryResult streamValue(BlockingQueue<QueryResult> inStream){
try{
return inStream.take();
}catch(Exception e){
return null;
}
}
由于阻塞API,这很难简单干净地解决。我认为这就是为什么这里没有简单的桥 API 来帮助您实现它。您应该想出一个干净的解决方案,首先将 BlockingQueue 变成 Flux。然后 spring-boot 部分就变成了一个非事件。
这就是为什么正确的解决方案可能涉及自定义 BlockingQueue 实现,例如 https://www.nurkiewicz.com/2015/07/consuming-javautilconcurrentblockingque.html
中的 ObservableQueue另一种方法是
如果您需要保留 LinkedBlockingQueue,起始解决方案可能如下所示。
val f = flux<String> {
val listenerQ = LinkedBlockingQueue<QueryResult>()
Datalistener.register(clientName,listenerQ);
while (true) {
send(bq.take())
}
}.subscribeOn(Schedulers.elastic())
使用 API 之类的通量,您绝对应该在订阅之前避免任何副作用,因此在方法主体内部之前不要注册您的侦听器。但是您需要改进此示例以处理取消,或者您取消侦听器并中断执行 take 的线程。