如何响应请求通过 Spring 响应式 websocket 推送数据?
How to push data over reactive websocket with Spring in response to request?
我正在使用 Spring Boot 2.1.3 开始使用反应式 websockets。我创建了一个 WebSocketHandler
实现,如下所示:
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(1);
var publisher = flux.map( o -> {
try {
return objectMapper.writeValueAsString(o);
} catch (JsonProcessingException e) {
e.printStackTrace();
return null;
}
}).map(session::textMessage)
.delayElements(Duration.ofSeconds(1));
return session.send(publisher);
}
这有效,如果我连接,我在我的 websocket 客户端中每秒得到序列化 EfficiencyData
。
但是,我想对来自 websocket 的请求做出反应,告诉 service
我想要数据的 ID。我设法得到这样的请求信息:
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.send(session.receive().map(webSocketMessage -> {
int id = Integer.parseInt(webSocketMessage.getPayloadAsText());
return session.textMessage("Subscribing with id " + id);
}));
现在我不知道如何结合这两个实现?
我希望做这样的事情:
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.send(session.receive().map(webSocketMessage -> {
int id = Integer.parseInt(webSocketMessage.getPayloadAsText());
Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
var publisher = flux.map( o -> {
try {
return objectMapper.writeValueAsString(o);
} catch (JsonProcessingException e) {
e.printStackTrace();
return null;
}
}).map(session::textMessage)
.delayElements(Duration.ofSeconds(1));
return publisher; //Does not compile
}));
但这不会编译,因为 publisher
是一个 Flux<WebSocketMessage>
,它应该是一个 Publisher<WebSocketMessage>
。应该如何处理?
编辑:
根据 WebSocketHandler
的 Javadoc 示例,我尝试了这个:
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<EfficiencyData> flux =
session.receive()
.map(webSocketMessage -> Integer.parseInt(webSocketMessage.getPayloadAsText()))
.concatMap(service::subscribeToEfficiencyData);
Mono<Void> input = flux.then();
Mono<Void> output = session.send(flux.map(data -> session.textMessage(data.toString()))).then();
return Mono.zip(input, output).then();
}
但这只会立即断开 websocket 客户端的连接,而无需执行任何操作。
使用 flatMap
或 concatMap
以扁平化 returned publisher
要解决您的问题,您必须使用允许平整 returned 值的运算符。例如
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.send(
session.receive()
.flatMap(webSocketMessage -> {
int id = Integer.parseInt(webSocketMessage.getPayloadAsText());
Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
var publisher = flux
.<String>handle((o, sink) -> {
try {
sink.next(objectMapper.writeValueAsString(o));
} catch (JsonProcessingException e) {
e.printStackTrace();
return; // null is prohibited in reactive-streams
}
})
.map(session::textMessage)
.delayElements(Duration.ofSeconds(1));
return publisher;
})
);
}
要点
- 如果return类型是流,使用
flatMap
或concatMap
(见区别here
- 从不 returns
Null
。在反应流中 Null
是禁止值(参见规范规则 here
- 当映射可以以
null
结束时 -> 使用 handle
运算符。查看更多解释
我正在使用 Spring Boot 2.1.3 开始使用反应式 websockets。我创建了一个 WebSocketHandler
实现,如下所示:
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(1);
var publisher = flux.map( o -> {
try {
return objectMapper.writeValueAsString(o);
} catch (JsonProcessingException e) {
e.printStackTrace();
return null;
}
}).map(session::textMessage)
.delayElements(Duration.ofSeconds(1));
return session.send(publisher);
}
这有效,如果我连接,我在我的 websocket 客户端中每秒得到序列化 EfficiencyData
。
但是,我想对来自 websocket 的请求做出反应,告诉 service
我想要数据的 ID。我设法得到这样的请求信息:
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.send(session.receive().map(webSocketMessage -> {
int id = Integer.parseInt(webSocketMessage.getPayloadAsText());
return session.textMessage("Subscribing with id " + id);
}));
现在我不知道如何结合这两个实现?
我希望做这样的事情:
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.send(session.receive().map(webSocketMessage -> {
int id = Integer.parseInt(webSocketMessage.getPayloadAsText());
Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
var publisher = flux.map( o -> {
try {
return objectMapper.writeValueAsString(o);
} catch (JsonProcessingException e) {
e.printStackTrace();
return null;
}
}).map(session::textMessage)
.delayElements(Duration.ofSeconds(1));
return publisher; //Does not compile
}));
但这不会编译,因为 publisher
是一个 Flux<WebSocketMessage>
,它应该是一个 Publisher<WebSocketMessage>
。应该如何处理?
编辑:
根据 WebSocketHandler
的 Javadoc 示例,我尝试了这个:
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<EfficiencyData> flux =
session.receive()
.map(webSocketMessage -> Integer.parseInt(webSocketMessage.getPayloadAsText()))
.concatMap(service::subscribeToEfficiencyData);
Mono<Void> input = flux.then();
Mono<Void> output = session.send(flux.map(data -> session.textMessage(data.toString()))).then();
return Mono.zip(input, output).then();
}
但这只会立即断开 websocket 客户端的连接,而无需执行任何操作。
使用 flatMap
或 concatMap
以扁平化 returned publisher
要解决您的问题,您必须使用允许平整 returned 值的运算符。例如
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.send(
session.receive()
.flatMap(webSocketMessage -> {
int id = Integer.parseInt(webSocketMessage.getPayloadAsText());
Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
var publisher = flux
.<String>handle((o, sink) -> {
try {
sink.next(objectMapper.writeValueAsString(o));
} catch (JsonProcessingException e) {
e.printStackTrace();
return; // null is prohibited in reactive-streams
}
})
.map(session::textMessage)
.delayElements(Duration.ofSeconds(1));
return publisher;
})
);
}
要点
- 如果return类型是流,使用
flatMap
或concatMap
(见区别here - 从不 returns
Null
。在反应流中Null
是禁止值(参见规范规则 here - 当映射可以以
null
结束时 -> 使用handle
运算符。查看更多解释