如何使用 Spring 引导 RSocket 服务器支持 WebSocket 传输?
How to support WebSocket transport with Spring Boot RSocket server?
TLDR:配置 Spring 公开支持 WebSocket 传输的 RSocket 接口的启动应用程序需要什么?
我正在学习 RSocket 和 Spring 同时启动,所以请多多包涵。
在我的奋斗中,我已经能够构建一个非常简单和人为的 Spring 引导应用程序的实现,该应用程序消耗 API provided/exposed 一秒钟 Spring 使用 RSocket 作为协议的启动应用程序,但是,我只能在使用 TcpClientTransport
.
时实现这一点
从我的角度来看,WebsocketTransport
更有可能被使用并且对客户端-> 服务器架构更有用,但是,我还没有找到任何关于如何正确配置的工作示例或文档Spring 使用 WebSocket 作为传输接受 RSocket 消息的引导应用程序。
奇怪的是,在我的测试中,我的消费者(客户端)似乎确实建立了到 server/producer 的 WebSocket 连接,但是,'handshake' 似乎挂起并且连接永远不会完全成立。我已经使用 JavaScript 库(rsocket-websocket-client、rsocket-rpc-core 等)和 Java 库(io.rsocket.transport.netty.client.WebsocketClientTransport)进行了测试,服务器似乎无论如何都表现出相同的行为。
重申一下,使用 TCPTransport 我能够连接到服务器并调用请求就好了,但是当使用 WebsocketTransport
时,连接永远不会建立。
旨在通过 WebsocketClientTransport
支持 RSocket 的 Spring 引导应用程序需要什么,过去使用 spring-boot-starter-rsocket
作为依赖项?
服务器
pom.xml
...
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.0.M5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
...
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
...
application.properties
spring.rsocket.server.port=8081
management.endpoints.enabled-by-default=true
management.endpoints.web.exposure.include=*
SpringBootRSocketServerApplication.java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringBootRSocketServerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootRSocketServerApplication.class, args);
}
}
UserRSocketController
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.stream.Stream;
@Slf4j
@Controller
public class UserRSocketController {
@Autowired
private UserRepository userRepository;
@MessageMapping("usersList")
public Mono<List<User>> usersList() {
log.info("Handling usersList request.");
return Mono.just(this.userRepository.getUsers());
}
@MessageMapping("usersStream")
Flux<User> usersStream(UserStreamRequest request) {
log.info("Handling request for usersStream.");
List<User> users = userRepository.getUsers();
Stream<User> userStream = Stream.generate(() -> {
Random rand = new Random();
return users.get(rand.nextInt(users.size()));
});
return Flux.fromStream(userStream).delayElements(Duration.ofSeconds(1));
}
@MessageMapping("userById")
public Mono<User> userById(GetUserByIdRequest request) {
log.info("Handling request for userById id: {}.", request.getId());
return Mono.just(this.userRepository.getUserById(request.getId()));
};
}
启动日志记录
:: Spring Boot :: (v2.2.0.M5)
2019-09-08 21:40:02,986 INFO [main] org.springframework.boot.StartupInfoLogger: Starting SpringBootRSocketServerApplication on REDACTED with PID 22540 (REDACTED)
2019-09-08 21:40:02,988 INFO [main] org.springframework.boot.SpringApplication: No active profile set, falling back to default profiles: default
2019-09-08 21:40:04,103 INFO [main] org.springframework.boot.actuate.endpoint.web.EndpointLinksResolver: Exposing 14 endpoint(s) beneath base path '/actuator'
2019-09-08 21:40:04,475 INFO [main] org.springframework.boot.rsocket.netty.NettyRSocketServer: Netty RSocket started on port(s): 8081
2019-09-08 21:40:04,494 INFO [main] org.springframework.boot.web.embedded.netty.NettyWebServer: Netty started on port(s): 8080
2019-09-08 21:40:04,498 INFO [main] org.springframework.boot.StartupInfoLogger: Started SpringBootRSocketServerApplication in 1.807 seconds (JVM running for 2.883)
Consumer/Client
ClientConfiguration.java
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
//import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.MetadataExtractor;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.MimeTypeUtils;
@Configuration
public class ClientConfiguration {
@Bean
public RSocket rSocket() {
// ClientTransport transport = TcpClientTransport.create(8081);
// ^--- TCPTransport works fine
ClientTransport transport = WebsocketClientTransport.create(8081);
// ^--- Connection hangs and application startup stalls
return RSocketFactory
.connect()
.mimeType(MetadataExtractor.ROUTING.toString(), MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(transport)
.start()
.block();
}
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
}
}
启动日志记录
:: Spring Boot :: (v2.2.0.M5)
2019-09-08 21:40:52,331 INFO [main] org.springframework.boot.StartupInfoLogger: Starting SpringBootRsocketConsumerApplication on REDACTED with PID 18904 (REDACTED)
2019-09-08 21:40:52,334 INFO [main] org.springframework.boot.SpringApplication: No active profile set, falling back to default profiles: default
基于 this blog post,要连接的正确端口是通过 server.port=8080
配置的端口。
服务器配置
server.port=8080
spring.rsocket.server.port=8081
spring.rsocket.server.mapping-path=/ws
spring.rsocket.server.transport=websocket
Java 消费者客户端配置
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.MetadataExtractor;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.MimeTypeUtils;
import java.net.URI;
import java.time.Duration;
@Configuration
public class ClientConfiguration {
@Bean
public RSocket rSocket() {
URI websocketUri = URI.create("ws://127.0.0.1:8080/ws");
WebsocketClientTransport ws = WebsocketClientTransport.create(websocketUri);
return RSocketFactory
.connect()
.mimeType(
MetadataExtractor.ROUTING.toString(),
MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(ws)
.start()
.block();
}
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(
rSocket(),
MimeTypeUtils.APPLICATION_JSON,
MetadataExtractor.ROUTING,
rSocketStrategies);
}
}
Java脚本客户端配置
import { RSocketClient, JsonSerializers } from 'rsocket-core';
import RSocketWebSocketClient from 'rsocket-websocket-client';
const transport = new RSocketWebSocketClient({
url: 'ws://127.0.0.1:8080/ws'
});
const client = new RSocketClient({
// send/receive JSON objects instead of strings/buffers
serializers: JsonSerializers,
setup: {
// ms btw sending keepalive to server
keepAlive: 60000,
// ms timeout if no keepalive response
lifetime: 180000,
// format of `data`
dataMimeType: 'application/json',
// format of `metadata`
metadataMimeType: 'application/json',
},
transport,
});
client.connect().then((rsocket) => {
// work with rsocket
});
您只需要两件事就可以让 RSocket 应用程序使用 websocket 传输公开端点:
首先,您需要 webflux 和 rsocket 依赖项,因为您可能还需要提供网页和静态资源:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
然后您需要在 application.properties
文件中相应地配置 RSocket 服务器:
#server.port=8080 this is already the default
spring.rsocket.server.transport=websocket
spring.rsocket.server.mapping-path=/rsocket
您会在 Spring Boot reference documentation about RSocket.
中找到更多相关信息
websocket 客户端现在可以连接到 ws://localhost:8080/rsocket
。
请注意,截至目前的 2.2.0 SNAPSHOT,RSocket 协议已经发展,rsocket-js 库正在迎头赶上,尤其是在元数据支持方面。你会发现 a working sample here as well.
在 Java 客户端,Spring Boot 为您提供了一个 RSocketRequester.Builder
,它已经根据您的需求使用编解码器和拦截器进行了配置和定制:
@Component
public class MyService {
private final RSocketRequester rsocketRequester;
public MyService(RSocketRequester.Builder builder) {
this.rsocketRequester = builder
.connectWebSocket(URI.create("ws://localhost:8080/rsocket"))
.block();
}
}
TLDR:配置 Spring 公开支持 WebSocket 传输的 RSocket 接口的启动应用程序需要什么?
我正在学习 RSocket 和 Spring 同时启动,所以请多多包涵。
在我的奋斗中,我已经能够构建一个非常简单和人为的 Spring 引导应用程序的实现,该应用程序消耗 API provided/exposed 一秒钟 Spring 使用 RSocket 作为协议的启动应用程序,但是,我只能在使用 TcpClientTransport
.
从我的角度来看,WebsocketTransport
更有可能被使用并且对客户端-> 服务器架构更有用,但是,我还没有找到任何关于如何正确配置的工作示例或文档Spring 使用 WebSocket 作为传输接受 RSocket 消息的引导应用程序。
奇怪的是,在我的测试中,我的消费者(客户端)似乎确实建立了到 server/producer 的 WebSocket 连接,但是,'handshake' 似乎挂起并且连接永远不会完全成立。我已经使用 JavaScript 库(rsocket-websocket-client、rsocket-rpc-core 等)和 Java 库(io.rsocket.transport.netty.client.WebsocketClientTransport)进行了测试,服务器似乎无论如何都表现出相同的行为。
重申一下,使用 TCPTransport 我能够连接到服务器并调用请求就好了,但是当使用 WebsocketTransport
时,连接永远不会建立。
旨在通过 WebsocketClientTransport
支持 RSocket 的 Spring 引导应用程序需要什么,过去使用 spring-boot-starter-rsocket
作为依赖项?
服务器
pom.xml
...
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.0.M5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
...
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
...
application.properties
spring.rsocket.server.port=8081
management.endpoints.enabled-by-default=true
management.endpoints.web.exposure.include=*
SpringBootRSocketServerApplication.java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringBootRSocketServerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootRSocketServerApplication.class, args);
}
}
UserRSocketController
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.stream.Stream;
@Slf4j
@Controller
public class UserRSocketController {
@Autowired
private UserRepository userRepository;
@MessageMapping("usersList")
public Mono<List<User>> usersList() {
log.info("Handling usersList request.");
return Mono.just(this.userRepository.getUsers());
}
@MessageMapping("usersStream")
Flux<User> usersStream(UserStreamRequest request) {
log.info("Handling request for usersStream.");
List<User> users = userRepository.getUsers();
Stream<User> userStream = Stream.generate(() -> {
Random rand = new Random();
return users.get(rand.nextInt(users.size()));
});
return Flux.fromStream(userStream).delayElements(Duration.ofSeconds(1));
}
@MessageMapping("userById")
public Mono<User> userById(GetUserByIdRequest request) {
log.info("Handling request for userById id: {}.", request.getId());
return Mono.just(this.userRepository.getUserById(request.getId()));
};
}
启动日志记录
:: Spring Boot :: (v2.2.0.M5)
2019-09-08 21:40:02,986 INFO [main] org.springframework.boot.StartupInfoLogger: Starting SpringBootRSocketServerApplication on REDACTED with PID 22540 (REDACTED)
2019-09-08 21:40:02,988 INFO [main] org.springframework.boot.SpringApplication: No active profile set, falling back to default profiles: default
2019-09-08 21:40:04,103 INFO [main] org.springframework.boot.actuate.endpoint.web.EndpointLinksResolver: Exposing 14 endpoint(s) beneath base path '/actuator'
2019-09-08 21:40:04,475 INFO [main] org.springframework.boot.rsocket.netty.NettyRSocketServer: Netty RSocket started on port(s): 8081
2019-09-08 21:40:04,494 INFO [main] org.springframework.boot.web.embedded.netty.NettyWebServer: Netty started on port(s): 8080
2019-09-08 21:40:04,498 INFO [main] org.springframework.boot.StartupInfoLogger: Started SpringBootRSocketServerApplication in 1.807 seconds (JVM running for 2.883)
Consumer/Client
ClientConfiguration.java
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
//import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.MetadataExtractor;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.MimeTypeUtils;
@Configuration
public class ClientConfiguration {
@Bean
public RSocket rSocket() {
// ClientTransport transport = TcpClientTransport.create(8081);
// ^--- TCPTransport works fine
ClientTransport transport = WebsocketClientTransport.create(8081);
// ^--- Connection hangs and application startup stalls
return RSocketFactory
.connect()
.mimeType(MetadataExtractor.ROUTING.toString(), MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(transport)
.start()
.block();
}
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
}
}
启动日志记录
:: Spring Boot :: (v2.2.0.M5)
2019-09-08 21:40:52,331 INFO [main] org.springframework.boot.StartupInfoLogger: Starting SpringBootRsocketConsumerApplication on REDACTED with PID 18904 (REDACTED)
2019-09-08 21:40:52,334 INFO [main] org.springframework.boot.SpringApplication: No active profile set, falling back to default profiles: default
基于 this blog post,要连接的正确端口是通过 server.port=8080
配置的端口。
服务器配置
server.port=8080
spring.rsocket.server.port=8081
spring.rsocket.server.mapping-path=/ws
spring.rsocket.server.transport=websocket
Java 消费者客户端配置
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.MetadataExtractor;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.MimeTypeUtils;
import java.net.URI;
import java.time.Duration;
@Configuration
public class ClientConfiguration {
@Bean
public RSocket rSocket() {
URI websocketUri = URI.create("ws://127.0.0.1:8080/ws");
WebsocketClientTransport ws = WebsocketClientTransport.create(websocketUri);
return RSocketFactory
.connect()
.mimeType(
MetadataExtractor.ROUTING.toString(),
MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(ws)
.start()
.block();
}
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(
rSocket(),
MimeTypeUtils.APPLICATION_JSON,
MetadataExtractor.ROUTING,
rSocketStrategies);
}
}
Java脚本客户端配置
import { RSocketClient, JsonSerializers } from 'rsocket-core';
import RSocketWebSocketClient from 'rsocket-websocket-client';
const transport = new RSocketWebSocketClient({
url: 'ws://127.0.0.1:8080/ws'
});
const client = new RSocketClient({
// send/receive JSON objects instead of strings/buffers
serializers: JsonSerializers,
setup: {
// ms btw sending keepalive to server
keepAlive: 60000,
// ms timeout if no keepalive response
lifetime: 180000,
// format of `data`
dataMimeType: 'application/json',
// format of `metadata`
metadataMimeType: 'application/json',
},
transport,
});
client.connect().then((rsocket) => {
// work with rsocket
});
您只需要两件事就可以让 RSocket 应用程序使用 websocket 传输公开端点:
首先,您需要 webflux 和 rsocket 依赖项,因为您可能还需要提供网页和静态资源:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
然后您需要在 application.properties
文件中相应地配置 RSocket 服务器:
#server.port=8080 this is already the default
spring.rsocket.server.transport=websocket
spring.rsocket.server.mapping-path=/rsocket
您会在 Spring Boot reference documentation about RSocket.
中找到更多相关信息websocket 客户端现在可以连接到 ws://localhost:8080/rsocket
。
请注意,截至目前的 2.2.0 SNAPSHOT,RSocket 协议已经发展,rsocket-js 库正在迎头赶上,尤其是在元数据支持方面。你会发现 a working sample here as well.
在 Java 客户端,Spring Boot 为您提供了一个 RSocketRequester.Builder
,它已经根据您的需求使用编解码器和拦截器进行了配置和定制:
@Component
public class MyService {
private final RSocketRequester rsocketRequester;
public MyService(RSocketRequester.Builder builder) {
this.rsocketRequester = builder
.connectWebSocket(URI.create("ws://localhost:8080/rsocket"))
.block();
}
}