如何在不同的执行上下文/线程上有多个 运行 的 Flux 订阅者
How to have multiple subscribers to Flux that run on different execution contexts / threads
我正在开发 Spring 用于 IoT 实时数据可视化的 Boot WebFlux 应用程序。
我有一个 Flux
模拟来自设备的数据,我希望在每个事件建立 websocket 连接后:
- 必须通过 websocket 发送以进行实时可视化(使用反应式
WebSocketHandler
)
- 必须根据给定条件进行检查,以便通过 HTTP REST 调用发送通知 (
RestTemplate
)
从我的日志来看,两个订阅者(websocket 处理程序和通知程序)似乎获得了两个具有完全不同值的不同流(在日志下方)。
我还尝试了在 MySource
class 中的 map
之后链接 share
方法的变体,在这种情况下,虽然我只有一个 Flux ,只有一个线程,所以一切都是阻塞的(我可以看到 REST 调用阻塞了通过 websocket 发送)。
这里发生了什么?如何使两个订阅者在不同的执行上下文(不同线程)中成为 运行,从而完全相互独立?
相关代码片段和日志下方。
提前谢谢大家!
更新: 为了清楚起见,我必须指定 MyEvent
s 具有随机生成的值,所以我解决了一个问题,感谢@NikolaB 的回答使用保证具有相同 Flux
的 ConnectableFlux
/ share
,但我仍然希望为两个订阅者提供单独的执行上下文。
public class MyWebSocketHandler implements WebSocketHandler {
@Autowired
public MySource mySource;
@Autowired
public Notifier notifier;
public Mono<Void> handle(WebSocketSession webSocketSession) {
Flux<MyEvent> events = mySource.events();
events.subscribe(event -> notifier.sendNotification(event));
return webSocketSession.send(events.map(this::toJson).map(webSocketSession::textMessage));
}
private String toJson(MyEvent event) {
log.info("websocket toJson " + event.getValue());
...
}
}
public class MySource {
public Flux<MyEvent> events() {
return Flux.interval(...).map(i -> new MyEvent(*Random Generate Value*);
}
}
public class Notifier {
public void sendNotification (MyEvent event) {
log.info("notifier sendNotification " + event.getValue());
if (condition met)
restTemplate.exchange(...)
}
}
2019-11-19 11:58:55.375 INFO [ parallel-3] i.a.m.websocket.MyWebSocketHandler : websocket toJson 4.09
2019-11-19 11:58:55.375 INFO [ parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.86
2019-11-19 11:58:57.366 INFO [ parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.24
2019-11-19 11:58:57.374 INFO [ parallel-3] i.a.m.websocket.MyWebSocketHandler : websocket toJson 4.11
2019-11-19 11:58:59.365 INFO [ parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.61
2019-11-19 11:58:59.374 INFO [ parallel-3] i.a.m.websocket.MyWebSocketHandler : websocket toJson 4.03
2019-11-19 11:59:01.365 INFO [ parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.88
2019-11-19 11:59:01.375 INFO [ parallel-3] i.a.m.websocket.MyWebSocketHandler : websocket toJson 4.29
2019-11-19 11:59:03.364 INFO [ parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.37
这里有几个问题,首先 RestTemplate
是 synchronous/blocking HTTP 客户端,所以你应该使用 WebClient
这是反应性的,也可以创建 ConnectableFlux
(Flux
可以有多个订阅者)你需要在 map
运营商之前分享它并创建新的 Flux
-es 是从连接的一个创建的。
示例:
Flux<MyEvent> connectedFlux = mySource.events().share();
Flux.from(connectedFlux).subscribe(event -> notifier.sendNotification(event));
return webSocketSession.send(Flux.from(connectedFlux).map(this::toJson).map(webSocketSession::textMessage));
另外 sendNotification
方法应该 return Mono<Void>
因为反应式方法应该总是 return Mono
或 Flux
类型。
要启动独立执行,您可以 Zip
那两个 Mono
。
编辑
首先,如上所述,使用 WebClient
作为响应式 HTTP 客户端的传出 HTTP 调用,然后修改 Notifier
class:
public class Notifier {
public Mono<Void> sendNotification (MyEvent event) {
log.info("notifier sendNotification " + event.getValue());
return Mono.just(event)
.filter(e -> /* your condition */)
.flatMap(e -> WebClient.builder().baseUrl("XXX")...)
.then();
}
}
现在看看执行上下文是否不同。
我正在开发 Spring 用于 IoT 实时数据可视化的 Boot WebFlux 应用程序。
我有一个 Flux
模拟来自设备的数据,我希望在每个事件建立 websocket 连接后:
- 必须通过 websocket 发送以进行实时可视化(使用反应式
WebSocketHandler
) - 必须根据给定条件进行检查,以便通过 HTTP REST 调用发送通知 (
RestTemplate
)
从我的日志来看,两个订阅者(websocket 处理程序和通知程序)似乎获得了两个具有完全不同值的不同流(在日志下方)。
我还尝试了在 MySource
class 中的 map
之后链接 share
方法的变体,在这种情况下,虽然我只有一个 Flux ,只有一个线程,所以一切都是阻塞的(我可以看到 REST 调用阻塞了通过 websocket 发送)。
这里发生了什么?如何使两个订阅者在不同的执行上下文(不同线程)中成为 运行,从而完全相互独立?
相关代码片段和日志下方。
提前谢谢大家!
更新: 为了清楚起见,我必须指定 MyEvent
s 具有随机生成的值,所以我解决了一个问题,感谢@NikolaB 的回答使用保证具有相同 Flux
的 ConnectableFlux
/ share
,但我仍然希望为两个订阅者提供单独的执行上下文。
public class MyWebSocketHandler implements WebSocketHandler {
@Autowired
public MySource mySource;
@Autowired
public Notifier notifier;
public Mono<Void> handle(WebSocketSession webSocketSession) {
Flux<MyEvent> events = mySource.events();
events.subscribe(event -> notifier.sendNotification(event));
return webSocketSession.send(events.map(this::toJson).map(webSocketSession::textMessage));
}
private String toJson(MyEvent event) {
log.info("websocket toJson " + event.getValue());
...
}
}
public class MySource {
public Flux<MyEvent> events() {
return Flux.interval(...).map(i -> new MyEvent(*Random Generate Value*);
}
}
public class Notifier {
public void sendNotification (MyEvent event) {
log.info("notifier sendNotification " + event.getValue());
if (condition met)
restTemplate.exchange(...)
}
}
2019-11-19 11:58:55.375 INFO [ parallel-3] i.a.m.websocket.MyWebSocketHandler : websocket toJson 4.09
2019-11-19 11:58:55.375 INFO [ parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.86
2019-11-19 11:58:57.366 INFO [ parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.24
2019-11-19 11:58:57.374 INFO [ parallel-3] i.a.m.websocket.MyWebSocketHandler : websocket toJson 4.11
2019-11-19 11:58:59.365 INFO [ parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.61
2019-11-19 11:58:59.374 INFO [ parallel-3] i.a.m.websocket.MyWebSocketHandler : websocket toJson 4.03
2019-11-19 11:59:01.365 INFO [ parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.88
2019-11-19 11:59:01.375 INFO [ parallel-3] i.a.m.websocket.MyWebSocketHandler : websocket toJson 4.29
2019-11-19 11:59:03.364 INFO [ parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.37
这里有几个问题,首先 RestTemplate
是 synchronous/blocking HTTP 客户端,所以你应该使用 WebClient
这是反应性的,也可以创建 ConnectableFlux
(Flux
可以有多个订阅者)你需要在 map
运营商之前分享它并创建新的 Flux
-es 是从连接的一个创建的。
示例:
Flux<MyEvent> connectedFlux = mySource.events().share();
Flux.from(connectedFlux).subscribe(event -> notifier.sendNotification(event));
return webSocketSession.send(Flux.from(connectedFlux).map(this::toJson).map(webSocketSession::textMessage));
另外 sendNotification
方法应该 return Mono<Void>
因为反应式方法应该总是 return Mono
或 Flux
类型。
要启动独立执行,您可以 Zip
那两个 Mono
。
编辑
首先,如上所述,使用 WebClient
作为响应式 HTTP 客户端的传出 HTTP 调用,然后修改 Notifier
class:
public class Notifier {
public Mono<Void> sendNotification (MyEvent event) {
log.info("notifier sendNotification " + event.getValue());
return Mono.just(event)
.filter(e -> /* your condition */)
.flatMap(e -> WebClient.builder().baseUrl("XXX")...)
.then();
}
}
现在看看执行上下文是否不同。