Socket.IO-客户端 Java 的 Reactor Flux 代理
Reactor Flux proxy for Socket.IO-client Java
我正在实施 Spring WebFlux 端点,它应该从 Socket.IO-client Java.
获取数据
我不明白将传入数据收集到 Flux 流中的方法。我可以通过某种方式创建新的 Flux 并将其订阅到传入的数据吗?谢谢指教。
@GetMapping("/streaming", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<MyRecourse> getStreaming() {
URI uri = URI.create("http://localhost/socket.io"); // client
Socket socket = IO.socket(uri);
socket.on("event", args -> {
JSONObject obj = (JSONObject)args[0];
MyRecourse recource = MyRecourse.create(obj);
// how to put this recource into Flux stream?
});
return fluxStreamOfRecources;
}
您可以使用 Flux.create() 从事件侦听器生成 Flux
。
Flux.<MyResource>create(emitter -> {
URI uri = URI.create("http://localhost/socket.io"); // client
Socket socket = IO.socket(uri);
socket.on("event", args -> {
JSONObject obj = (JSONObject)args[0];
MyResource resource = MyResource.create(obj);
emitter.next(resource);
});
// subscribe on error events
socket.on(Socket.EVENT_CONNECT_ERROR, args -> {
// get error
emitter.error(throwable);
});
// unsubscribe from events when the client cancels
emitter.onDispose(() -> {
// disconnect from socket
// socket.off(...)
});
});
我正在实施 Spring WebFlux 端点,它应该从 Socket.IO-client Java.
获取数据我不明白将传入数据收集到 Flux 流中的方法。我可以通过某种方式创建新的 Flux 并将其订阅到传入的数据吗?谢谢指教。
@GetMapping("/streaming", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<MyRecourse> getStreaming() {
URI uri = URI.create("http://localhost/socket.io"); // client
Socket socket = IO.socket(uri);
socket.on("event", args -> {
JSONObject obj = (JSONObject)args[0];
MyRecourse recource = MyRecourse.create(obj);
// how to put this recource into Flux stream?
});
return fluxStreamOfRecources;
}
您可以使用 Flux.create() 从事件侦听器生成 Flux
。
Flux.<MyResource>create(emitter -> {
URI uri = URI.create("http://localhost/socket.io"); // client
Socket socket = IO.socket(uri);
socket.on("event", args -> {
JSONObject obj = (JSONObject)args[0];
MyResource resource = MyResource.create(obj);
emitter.next(resource);
});
// subscribe on error events
socket.on(Socket.EVENT_CONNECT_ERROR, args -> {
// get error
emitter.error(throwable);
});
// unsubscribe from events when the client cancels
emitter.onDispose(() -> {
// disconnect from socket
// socket.off(...)
});
});