Spring Webflux WebSocket Client - 如何处理不同的消息结构?
Spring Webflux WebSocket Client - how to handle different message structures?
我面临的挑战是,现有的 Web 服务 return 一次呼叫 2 条消息,但具有不同的结构/类型。
如何使用 Spring 中的 WebFlux ReactorNettyWebSocketClient 处理此问题?即我如何附加 2 个不同的映射器或通过多个订阅者处理它?
这里假设这两个响应消息的顺序始终相同,因此第一个将由“info”映射器解析,第二个将由“auth”映射器解析。
第二条消息是订阅者的信息。然后服务器将继续在通道“0”上发送多条消息。当我收到第二条消息时,如何与我的客户订阅这些?
客户:
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(uri, session -> session.send(Mono.just(session.textMessage(authCommand)))
.thenMany(session.receive().take(2).doOnNext(message -> {
System.out.println(message.getPayloadAsText());
})).then()).block();
来自服务器的第一个 return:
{"event":"info","version":2,"serverId":"20df78274661","platform":{"status":1}}
{"event":"auth","status":"OK","chanId":0,"userId":2256812}
服务器发送两条消息后发送的频道消息:
[0,"wu",["funding","USD",11041.78713953,0,8.090876180000123,null,null]]
[0,"wu",["funding","BTC",0.25495514,0,4.000000003445692e-8,null,null]]
[0,"wu",["funding","EUR",2139.03965717,0,0.00965717000008226,null,null]]
感谢您提前提出任何建议和想法。
在研究了一些关于 WebFlux 和 Reactor 的 material 之后,我得出了以下方法。我正在使用一个处理器,您可以在其中附加多个订阅者。然后将此处理器附加到 WebSocketClient 中。
可以在此处找到关于不同处理器的很好的总结
How to use Processor in Java's Reactor
通过这种方法,我可以动态添加订阅者。因此,当我检测到频道消息时,我可以使用过滤器为该频道号向我的处理器添加一个新订阅者。下面的虚拟代码说明了行为。
ReplayProcessor<String> output = ReplayProcessor.create();
output.filter(msg -> msg.startsWith("{")).subscribe(msg -> {
System.out.println(msg);
JSONObject json = new JSONObject(msg);
if (json.has("event") && json.get("event").equals("auth")) {
if (json.has("chanId")) {
int chanId = json.getInt("chanId");
// add new subscriber for that channel number, receives only messages from that channel
output.filter(t -> t.startsWith("[" + chanId)).subscribe( t -> System.err.println(t));
}
}
});
client.execute(uri, session ->
// send message
session.send(Mono.just(session.textMessage(authCommand)))
.thenMany(session.receive()
.map(message -> message.getPayloadAsText())
.subscribeWith(output))
.then()).block();
我面临的挑战是,现有的 Web 服务 return 一次呼叫 2 条消息,但具有不同的结构/类型。
如何使用 Spring 中的 WebFlux ReactorNettyWebSocketClient 处理此问题?即我如何附加 2 个不同的映射器或通过多个订阅者处理它?
这里假设这两个响应消息的顺序始终相同,因此第一个将由“info”映射器解析,第二个将由“auth”映射器解析。
第二条消息是订阅者的信息。然后服务器将继续在通道“0”上发送多条消息。当我收到第二条消息时,如何与我的客户订阅这些?
客户:
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(uri, session -> session.send(Mono.just(session.textMessage(authCommand)))
.thenMany(session.receive().take(2).doOnNext(message -> {
System.out.println(message.getPayloadAsText());
})).then()).block();
来自服务器的第一个 return:
{"event":"info","version":2,"serverId":"20df78274661","platform":{"status":1}}
{"event":"auth","status":"OK","chanId":0,"userId":2256812}
服务器发送两条消息后发送的频道消息:
[0,"wu",["funding","USD",11041.78713953,0,8.090876180000123,null,null]]
[0,"wu",["funding","BTC",0.25495514,0,4.000000003445692e-8,null,null]]
[0,"wu",["funding","EUR",2139.03965717,0,0.00965717000008226,null,null]]
感谢您提前提出任何建议和想法。
在研究了一些关于 WebFlux 和 Reactor 的 material 之后,我得出了以下方法。我正在使用一个处理器,您可以在其中附加多个订阅者。然后将此处理器附加到 WebSocketClient 中。
可以在此处找到关于不同处理器的很好的总结
How to use Processor in Java's Reactor
通过这种方法,我可以动态添加订阅者。因此,当我检测到频道消息时,我可以使用过滤器为该频道号向我的处理器添加一个新订阅者。下面的虚拟代码说明了行为。
ReplayProcessor<String> output = ReplayProcessor.create();
output.filter(msg -> msg.startsWith("{")).subscribe(msg -> {
System.out.println(msg);
JSONObject json = new JSONObject(msg);
if (json.has("event") && json.get("event").equals("auth")) {
if (json.has("chanId")) {
int chanId = json.getInt("chanId");
// add new subscriber for that channel number, receives only messages from that channel
output.filter(t -> t.startsWith("[" + chanId)).subscribe( t -> System.err.println(t));
}
}
});
client.execute(uri, session ->
// send message
session.send(Mono.just(session.textMessage(authCommand)))
.thenMany(session.receive()
.map(message -> message.getPayloadAsText())
.subscribeWith(output))
.then()).block();