如何在发送下一个内容之前等待变量更改,reactor-netty 的 websocket
How to wait for variables to change before sending the next content , websocket by reactor-netty
怎么办? ReadFlushMessage() 将等待这个。消息以获取新内容。
没有立即退出 websocket 连接
private Set<String> message = new HashSet<>();
private void writeMessage(String message) {
this.message.add(message);
}
private String[] readFlushMessage() {
String[] _message = (String[])this.message.toArray();
this.message = new HashSet<>();
return _message;
}
private Publisher<Void> websocketPublisherA(HttpServerRequest request, HttpServerResponse response, WebSocketServerHandle handleObject) {
return response
.header("content-type", "text/plain")
.sendWebsocket((in, out) ->
out.options(NettyPipeline.SendOptions::flushOnEach)
.sendString(
Flux.just(readFlushMessage())
)
);
}
尝试使用 Reactor Core 类型。
这是一个非常简单的示例,说明您想要实现的目标:
@Test
public void test() {
FluxProcessor<String, String> serverMsg =
ReplayProcessor.<String>create();
Flux.range(1, 20)
.map(Object::toString)
.subscribe(serverMsg::onNext);
DisposableServer server =
HttpServer.create()
.port(0)
.handle((req, resp) ->
resp.header("content-type", "text/plain")
.sendWebsocket((in, out) ->
out.options(NettyPipeline.SendOptions::flushOnEach)
.sendString(serverMsg)
))
.bindNow();
HttpClient.create()
.port(server.port())
.websocket()
.receive()
.asString()
.doOnNext(System.out::println)
.blockLast();
server.disposeNow();
}
怎么办? ReadFlushMessage() 将等待这个。消息以获取新内容。
没有立即退出 websocket 连接
private Set<String> message = new HashSet<>();
private void writeMessage(String message) {
this.message.add(message);
}
private String[] readFlushMessage() {
String[] _message = (String[])this.message.toArray();
this.message = new HashSet<>();
return _message;
}
private Publisher<Void> websocketPublisherA(HttpServerRequest request, HttpServerResponse response, WebSocketServerHandle handleObject) {
return response
.header("content-type", "text/plain")
.sendWebsocket((in, out) ->
out.options(NettyPipeline.SendOptions::flushOnEach)
.sendString(
Flux.just(readFlushMessage())
)
);
}
尝试使用 Reactor Core 类型。
这是一个非常简单的示例,说明您想要实现的目标:
@Test
public void test() {
FluxProcessor<String, String> serverMsg =
ReplayProcessor.<String>create();
Flux.range(1, 20)
.map(Object::toString)
.subscribe(serverMsg::onNext);
DisposableServer server =
HttpServer.create()
.port(0)
.handle((req, resp) ->
resp.header("content-type", "text/plain")
.sendWebsocket((in, out) ->
out.options(NettyPipeline.SendOptions::flushOnEach)
.sendString(serverMsg)
))
.bindNow();
HttpClient.create()
.port(server.port())
.websocket()
.receive()
.asString()
.doOnNext(System.out::println)
.blockLast();
server.disposeNow();
}