Spring Integraton RSocket 和 Spring RSocket 交互问题
Spring Integraton RSocket and Spring RSocket interaction issues
我创建了一个新示例并将代码插入客户端和服务器端。
可以找到完整的代码here。
服务器端有3个版本。
- server None Spring 启动应用程序,使用 Spring 集成 RSocket InboundGateway。
- server-boot 重用 Spring RSocket 自动配置,并通过
ServerRSocketMessageHanlder
. 创建 ServerRSocketConnecter
- server-boot-messsagemapping 不使用 Spring 集成,只使用 Spring Boot RSocket 自动配置,以及
@Controller
和 @MessageMapping
.
客户端有2个版本。
- 客户端,使用Spring Integration Rocket OutboundGateway 发送消息。
- client-requester 使用
RSocketRequester
发送消息,根本不使用 Spring 集成。
客户端与服务器交互方式为REQUEST_CHANNEL,通过TCP/localhost:7000连接服务器。
服务器
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
</dependency>
申请class:
@Configuration
@ComponentScan
@IntegrationComponentScan
@EnableIntegration
public class DemoApplication {
public static void main(String[] args) throws IOException {
try (ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(DemoApplication.class)) {
System.out.println("Press any key to exit.");
System.in.read();
} finally {
System.out.println("Exited.");
}
}
@Bean
public ServerRSocketConnector serverRSocketConnector() {
return new ServerRSocketConnector("localhost", 7000);
}
@Bean
public IntegrationFlow rsocketUpperCaseFlow(ServerRSocketConnector serverRSocketConnector) {
return IntegrationFlows
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel)
.rsocketConnector(serverRSocketConnector)
)
.<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
.get();
}
}
服务器启动
pom.xml 中的依赖项。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
</dependency>
application.properties
spring.rsocket.server.port=7000
spring.rsocket.server.transport=tcp
申请class.
@SpringBootApplication
@EnableIntegration
public class DemoApplication {
public static void main(String[] args) throws IOException {
SpringApplication.run(DemoApplication.class, args);
}
// see PR: https://github.com/spring-projects/spring-boot/pull/18834
@Bean
ServerRSocketMessageHandler serverRSocketMessageHandler(RSocketStrategies rSocketStrategies) {
var handler = new ServerRSocketMessageHandler(true);
handler.setRSocketStrategies(rSocketStrategies);
return handler;
}
@Bean
public ServerRSocketConnector serverRSocketConnector(ServerRSocketMessageHandler serverRSocketMessageHandler) {
return new ServerRSocketConnector(serverRSocketMessageHandler);
}
@Bean
public IntegrationFlow rsocketUpperCaseFlow(ServerRSocketConnector serverRSocketConnector) {
return IntegrationFlows
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel)
.rsocketConnector(serverRSocketConnector)
)
.<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
.get();
}
}
服务器启动消息映射
pom.xml 中的依赖项。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
application.properties.
spring.rsocket.server.port=7000
spring.rsocket.server.transport=tcp
应用程序class。
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
@Controller
class UpperCaseHandler {
@MessageMapping("/uppercase")
public Flux<String> uppercase(Flux<String> input) {
return input.map(String::toUpperCase);
}
}
客户
在客户端中,pom.xml中的依赖是这样的。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
</dependency>
申请class:
@SpringBootApplication
@EnableIntegration
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public ClientRSocketConnector clientRSocketConnector() {
ClientRSocketConnector clientRSocketConnector = new ClientRSocketConnector("localhost", 7000);
clientRSocketConnector.setAutoStartup(false);
return clientRSocketConnector;
}
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlows
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.interactionModel((message) -> RSocketInteractionModel.requestChannel)
.expectedResponseType("T(java.lang.String)")
.clientRSocketConnector(clientRSocketConnector))
.get();
}
}
@RestController
class HelloController {
@Autowired()
@Lazy
@Qualifier("rsocketUpperCaseRequestFlow.gateway")
private Function<Flux<String>, Flux<String>> rsocketUpperCaseFlowFunction;
@GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> uppercase() {
return rsocketUpperCaseFlowFunction.apply(Flux.just("a", "b", "c", "d"));
}
}
当运行客户端和服务器端应用,并尝试通过curl
访问http://localhost:8080/hello
。
当使用 server 和使用 InboundGateway 处理消息的 server-boot 时,输出如下所示。
curl http://localhost:8080/hello
data:ABCD
当使用 server-boot-messagemapping 时,输出如我所料:
data:A
data:B
data:C
data:D
客户请求者
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
申请class:
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
@RestController
class HelloController {
Mono<RSocketRequester> requesterMono;
public HelloController(RSocketRequester.Builder builder) {
this.requesterMono = builder.connectTcp("localhost", 7000);
}
@GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> uppercase() {
return requesterMono.flatMapMany(
rSocketRequester -> rSocketRequester.route("/uppercase")
.data(Flux.just("a", "b", "c", "d"))
.retrieveFlux(String.class)
);
}
}
当运行这个客户端和3个服务器,并尝试通过curl
访问http://localhost:8080/hello
。
当使用 server 和使用 InboundGateway 处理消息的 server-boot 时,会抛出 class 转换异常。
当使用 server-boot-messagemapping 时,输出如我所料:
data:A
data:B
data:C
data:D
不知道InboundGateway和OutboundGateway的配置问题在哪里?
感谢您提供如此详细的样本!
所以,我所看到的。两个客户端(普通 RSocketRequester
和 Spring 集成)与普通 RSocket 服务器配合良好。
要使它们与 Spring 集成服务器一起工作,您必须进行以下更改:
- 服务器端:
将 .requestElementType(ResolvableType.forClass(String.class))
添加到 RSockets.inboundGateway()
定义中,这样它就会知道将传入的有效载荷转换成什么。
客户端:
.data(Flux.just("a\n", "b\n", "c\n", "d\n"))
.
目前 Spring 集成的服务器端不会将传入的 Flux
视为独立负载流。因此,我们尝试将所有这些连接成一个值。
新行定界符是我们期望独立值的指示符。 Spring Messaging 在其方面的作用恰恰相反:它检查 multi-value
预期类型并在其 map()
中解码传入 Flux
中的每个元素,而不是尝试对整个 Publisher
解码.
这将是一项重大更改,但可能需要考虑修复 RSocketInboundGateway
逻辑以与常规 @MessageMapping
一致以支持 RSocket。欢迎提出 GH 问题!
我创建了一个新示例并将代码插入客户端和服务器端。
可以找到完整的代码here。
服务器端有3个版本。
- server None Spring 启动应用程序,使用 Spring 集成 RSocket InboundGateway。
- server-boot 重用 Spring RSocket 自动配置,并通过
ServerRSocketMessageHanlder
. 创建 - server-boot-messsagemapping 不使用 Spring 集成,只使用 Spring Boot RSocket 自动配置,以及
@Controller
和@MessageMapping
.
ServerRSocketConnecter
客户端有2个版本。
- 客户端,使用Spring Integration Rocket OutboundGateway 发送消息。
- client-requester 使用
RSocketRequester
发送消息,根本不使用 Spring 集成。
客户端与服务器交互方式为REQUEST_CHANNEL,通过TCP/localhost:7000连接服务器。
服务器
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
</dependency>
申请class:
@Configuration
@ComponentScan
@IntegrationComponentScan
@EnableIntegration
public class DemoApplication {
public static void main(String[] args) throws IOException {
try (ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(DemoApplication.class)) {
System.out.println("Press any key to exit.");
System.in.read();
} finally {
System.out.println("Exited.");
}
}
@Bean
public ServerRSocketConnector serverRSocketConnector() {
return new ServerRSocketConnector("localhost", 7000);
}
@Bean
public IntegrationFlow rsocketUpperCaseFlow(ServerRSocketConnector serverRSocketConnector) {
return IntegrationFlows
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel)
.rsocketConnector(serverRSocketConnector)
)
.<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
.get();
}
}
服务器启动
pom.xml 中的依赖项。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
</dependency>
application.properties
spring.rsocket.server.port=7000
spring.rsocket.server.transport=tcp
申请class.
@SpringBootApplication
@EnableIntegration
public class DemoApplication {
public static void main(String[] args) throws IOException {
SpringApplication.run(DemoApplication.class, args);
}
// see PR: https://github.com/spring-projects/spring-boot/pull/18834
@Bean
ServerRSocketMessageHandler serverRSocketMessageHandler(RSocketStrategies rSocketStrategies) {
var handler = new ServerRSocketMessageHandler(true);
handler.setRSocketStrategies(rSocketStrategies);
return handler;
}
@Bean
public ServerRSocketConnector serverRSocketConnector(ServerRSocketMessageHandler serverRSocketMessageHandler) {
return new ServerRSocketConnector(serverRSocketMessageHandler);
}
@Bean
public IntegrationFlow rsocketUpperCaseFlow(ServerRSocketConnector serverRSocketConnector) {
return IntegrationFlows
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel)
.rsocketConnector(serverRSocketConnector)
)
.<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
.get();
}
}
服务器启动消息映射
pom.xml 中的依赖项。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
application.properties.
spring.rsocket.server.port=7000
spring.rsocket.server.transport=tcp
应用程序class。
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
@Controller
class UpperCaseHandler {
@MessageMapping("/uppercase")
public Flux<String> uppercase(Flux<String> input) {
return input.map(String::toUpperCase);
}
}
客户
在客户端中,pom.xml中的依赖是这样的。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
</dependency>
申请class:
@SpringBootApplication
@EnableIntegration
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public ClientRSocketConnector clientRSocketConnector() {
ClientRSocketConnector clientRSocketConnector = new ClientRSocketConnector("localhost", 7000);
clientRSocketConnector.setAutoStartup(false);
return clientRSocketConnector;
}
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlows
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.interactionModel((message) -> RSocketInteractionModel.requestChannel)
.expectedResponseType("T(java.lang.String)")
.clientRSocketConnector(clientRSocketConnector))
.get();
}
}
@RestController
class HelloController {
@Autowired()
@Lazy
@Qualifier("rsocketUpperCaseRequestFlow.gateway")
private Function<Flux<String>, Flux<String>> rsocketUpperCaseFlowFunction;
@GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> uppercase() {
return rsocketUpperCaseFlowFunction.apply(Flux.just("a", "b", "c", "d"));
}
}
当运行客户端和服务器端应用,并尝试通过curl
访问http://localhost:8080/hello
。
当使用 server 和使用 InboundGateway 处理消息的 server-boot 时,输出如下所示。
curl http://localhost:8080/hello
data:ABCD
当使用 server-boot-messagemapping 时,输出如我所料:
data:A
data:B
data:C
data:D
客户请求者
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
申请class:
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
@RestController
class HelloController {
Mono<RSocketRequester> requesterMono;
public HelloController(RSocketRequester.Builder builder) {
this.requesterMono = builder.connectTcp("localhost", 7000);
}
@GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> uppercase() {
return requesterMono.flatMapMany(
rSocketRequester -> rSocketRequester.route("/uppercase")
.data(Flux.just("a", "b", "c", "d"))
.retrieveFlux(String.class)
);
}
}
当运行这个客户端和3个服务器,并尝试通过curl
访问http://localhost:8080/hello
。
当使用 server 和使用 InboundGateway 处理消息的 server-boot 时,会抛出 class 转换异常。
当使用 server-boot-messagemapping 时,输出如我所料:
data:A
data:B
data:C
data:D
不知道InboundGateway和OutboundGateway的配置问题在哪里?
感谢您提供如此详细的样本!
所以,我所看到的。两个客户端(普通 RSocketRequester
和 Spring 集成)与普通 RSocket 服务器配合良好。
要使它们与 Spring 集成服务器一起工作,您必须进行以下更改:
- 服务器端:
将 .requestElementType(ResolvableType.forClass(String.class))
添加到 RSockets.inboundGateway()
定义中,这样它就会知道将传入的有效载荷转换成什么。
客户端:
.data(Flux.just("a\n", "b\n", "c\n", "d\n"))
.
目前 Spring 集成的服务器端不会将传入的 Flux
视为独立负载流。因此,我们尝试将所有这些连接成一个值。
新行定界符是我们期望独立值的指示符。 Spring Messaging 在其方面的作用恰恰相反:它检查 multi-value
预期类型并在其 map()
中解码传入 Flux
中的每个元素,而不是尝试对整个 Publisher
解码.
这将是一项重大更改,但可能需要考虑修复 RSocketInboundGateway
逻辑以与常规 @MessageMapping
一致以支持 RSocket。欢迎提出 GH 问题!