如何使用 Spring Reactive WebSocket 并将其转化为 Flux 流?
How to use Spring Reactive WebSocket and transform it into the Flux stream?
Spring documentation 上有一些 WebSocketClient
示例:
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute("ws://localhost:8080/echo"), session -> {...}).blockMillis(5000);
我不确定如何处理传入数据流?
在那个街区 {...}
.
我的意思是:如何过滤传入的数据并将其转换为 Flux?
这是我想要得到的。
@GetMapping("/stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<MyRecourse> getStreaming() {
// get some data from WebSocket (CoinCap service).
// Transform that data into MyRecourse object
// Return stream to a client
}
看看 WebSocketHandler.handle()
lambda 的 WebSocketSession
参数:
/**
* Get the flux of incoming messages.
*/
Flux<WebSocketMessage> receive();
有关详细信息,请参阅 Spring WebFlux Workshop。
更新
让我们试试这个!
Mono<Void> sessionMono =
client.execute(new URI("ws://localhost:8080/echo"),
session ->
Mono.empty()
.subscriberContext(Context.of(WebSocketSession.class, session))
.then());
return sessionMono
.thenMany(
Mono.subscriberContext()
.flatMapMany(c -> c
.get(WebSocketSession.class)
.receive()))
.map(WebSocketMessage::getPayloadAsText);
更新 2
或另一个选项,但订阅被阻止:
EmitterProcessor<String> output = EmitterProcessor.create();
client.execute(new URI("ws://localhost:8080/echo"),
session ->
session.receive()
.map(WebSocketMessage::getPayloadAsText)
.subscribeWith(output)
.then())
.block(Duration.ofMillis(5000));
return output;
更新 3
关于此事的工作 Spring 引导应用程序:https://github.com/artembilan/webflux-websocket-demo
主要代码如下:
EmitterProcessor<String> output = EmitterProcessor.create();
Mono<Void> sessionMono =
client.execute(new URI("ws://localhost:8080/echo"),
session -> session.receive()
.map(WebSocketMessage::getPayloadAsText)
.subscribeWith(output)
.then());
return output.doOnSubscribe(s -> sessionMono.subscribe());
Spring documentation 上有一些 WebSocketClient
示例:
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute("ws://localhost:8080/echo"), session -> {...}).blockMillis(5000);
我不确定如何处理传入数据流?
在那个街区 {...}
.
我的意思是:如何过滤传入的数据并将其转换为 Flux?
这是我想要得到的。
@GetMapping("/stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<MyRecourse> getStreaming() {
// get some data from WebSocket (CoinCap service).
// Transform that data into MyRecourse object
// Return stream to a client
}
看看 WebSocketHandler.handle()
lambda 的 WebSocketSession
参数:
/**
* Get the flux of incoming messages.
*/
Flux<WebSocketMessage> receive();
有关详细信息,请参阅 Spring WebFlux Workshop。
更新
让我们试试这个!
Mono<Void> sessionMono =
client.execute(new URI("ws://localhost:8080/echo"),
session ->
Mono.empty()
.subscriberContext(Context.of(WebSocketSession.class, session))
.then());
return sessionMono
.thenMany(
Mono.subscriberContext()
.flatMapMany(c -> c
.get(WebSocketSession.class)
.receive()))
.map(WebSocketMessage::getPayloadAsText);
更新 2
或另一个选项,但订阅被阻止:
EmitterProcessor<String> output = EmitterProcessor.create();
client.execute(new URI("ws://localhost:8080/echo"),
session ->
session.receive()
.map(WebSocketMessage::getPayloadAsText)
.subscribeWith(output)
.then())
.block(Duration.ofMillis(5000));
return output;
更新 3
关于此事的工作 Spring 引导应用程序:https://github.com/artembilan/webflux-websocket-demo
主要代码如下:
EmitterProcessor<String> output = EmitterProcessor.create();
Mono<Void> sessionMono =
client.execute(new URI("ws://localhost:8080/echo"),
session -> session.receive()
.map(WebSocketMessage::getPayloadAsText)
.subscribeWith(output)
.then());
return output.doOnSubscribe(s -> sessionMono.subscribe());