有状态 Rsocket 应用程序
Stateful Rsocket Application
在我的项目中,我希望有多个客户端连接到一个服务。我正在使用 java Rsocket 实现。
服务应该为每个客户端维护一个状态。现在在这一点上,我可以通过一些标识符来管理客户端。这个选项我已经实现了。但我不想使用字符串手动管理会话。
所以另一个想法是通过 Rsocket 连接来识别客户端。有没有办法使用 Rsocket 通道来识别特定客户端?
想象一个示例服务和几个客户端。每个客户端都有 Rsocket 通道,服务已启动 运行。有没有办法使用 Rsocket 通道在服务器端识别这些客户端?如果您能展示此类行为的编程示例,那就太棒了。
谢谢!
编辑(更详细地描述案例)
这是我的例子。
我们目前使用了三个 CORBA 对象,如图所示:
- LoginObject(通过 NamingService 检索对其的引用)。客户端可以调用 login() 方法来获取会话
- Session 对象有多种查询当前服务上下文详细信息的方法,最重要的是获取 Transaction 对象
- Transaction 对象可用于通过以 commandName 和键值对列表作为参数的通用方法执行各种命令。
客户端执行 n 个命令后,他可以提交或回滚事务(也可以通过 Transaction 对象上的方法)。
所以这里我们使用会话对象在我们的服务上执行事务。
现在我们决定从 CORBA 转移到 Rsocket。因此我们需要 Rsocket 微服务能够存储会话的状态,否则我们无法知道什么将被提交或回滚。这可以通过每个客户的单独发布者来完成吗?
从你的描述看来 channel
效果最好,我以前没有使用过频道所以我不能保证(抱歉)。但我建议你尝试这样的事情:
一个事务控制器:
public class TransactionController implements Publisher<Payload> {
List<Transaction> transcations = new ArrayList<>();
@Override
public void subscribe(Subscriber<? super Payload> subscriber) {
}
public void processPayload(Payload payload) {
// handle transcations...
}
}
并且在您的 RSocket
实现中覆盖 requestChannel
:
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
// Create new controller for each channel
TranscationController cntrl = new TranscationController();
Flux.from(payloads)
.subscribe(cntrl::processPayload);
return Flux.from(cntrl);
}
这是我前几天做的一个例子,它将使用 Netifi 的代理创建一个有状态的 RSocket:
https://github.com/netifi/netifi-stateful-socket
遗憾的是,您需要在本地构建我们的开发分支才能试用 (https://github.com/netifi/netifi-java) - 如果您不想构建,应该会在本周末发布带有代码的版本它在本地。
我也在研究纯 RSocket 示例,但如果您想了解它是如何实现的,请查看示例中的 StatefulSocket。它应该为您提供如何使用纯 RSocket 处理会话的线索。
关于您关于事务管理器的其他问题 - 您需要将您的事务与正在发出的 Reactive Streams 信号联系起来 - 如果您收到取消,您将回滚的 onError,如果收到 onComplete你会提交交易。 Flux/Mono 中的一些副作用方法应该可以轻松处理。根据你在做什么,你也可以使用 BaseSubscriber,因为它有钩子来处理不同的 Reactive Streams 信号。
谢谢,
罗伯特
恢复连接的示例,即维护服务器上的状态,已登陆 rsocket-java 存储库
https://github.com/rsocket/rsocket-java/commit/d47629147dd1a4d41c7c8d5af3d80838e01d3ba5
恢复整个连接,包括与每个单独通道关联的任何状态等。
有一个 rsocket-cli 项目可以让您尝试一下。启动和停止socat进程,观察客户端和服务器进程。
$ socat -d TCP-LISTEN:5001,fork,reuseaddr TCP:localhost:5000
$ ./rsocket-cli --debug --resume --server -i cli:time tcp://localhost:5000
$ ./rsocket-cli -i client --stream --resume tcp://localhost:5001
在我的项目中,我希望有多个客户端连接到一个服务。我正在使用 java Rsocket 实现。
服务应该为每个客户端维护一个状态。现在在这一点上,我可以通过一些标识符来管理客户端。这个选项我已经实现了。但我不想使用字符串手动管理会话。
所以另一个想法是通过 Rsocket 连接来识别客户端。有没有办法使用 Rsocket 通道来识别特定客户端?
想象一个示例服务和几个客户端。每个客户端都有 Rsocket 通道,服务已启动 运行。有没有办法使用 Rsocket 通道在服务器端识别这些客户端?如果您能展示此类行为的编程示例,那就太棒了。 谢谢!
编辑(更详细地描述案例)
这是我的例子。
我们目前使用了三个 CORBA 对象,如图所示:
- LoginObject(通过 NamingService 检索对其的引用)。客户端可以调用 login() 方法来获取会话
- Session 对象有多种查询当前服务上下文详细信息的方法,最重要的是获取 Transaction 对象
- Transaction 对象可用于通过以 commandName 和键值对列表作为参数的通用方法执行各种命令。 客户端执行 n 个命令后,他可以提交或回滚事务(也可以通过 Transaction 对象上的方法)。
所以这里我们使用会话对象在我们的服务上执行事务。
现在我们决定从 CORBA 转移到 Rsocket。因此我们需要 Rsocket 微服务能够存储会话的状态,否则我们无法知道什么将被提交或回滚。这可以通过每个客户的单独发布者来完成吗?
从你的描述看来 channel
效果最好,我以前没有使用过频道所以我不能保证(抱歉)。但我建议你尝试这样的事情:
一个事务控制器:
public class TransactionController implements Publisher<Payload> {
List<Transaction> transcations = new ArrayList<>();
@Override
public void subscribe(Subscriber<? super Payload> subscriber) {
}
public void processPayload(Payload payload) {
// handle transcations...
}
}
并且在您的 RSocket
实现中覆盖 requestChannel
:
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
// Create new controller for each channel
TranscationController cntrl = new TranscationController();
Flux.from(payloads)
.subscribe(cntrl::processPayload);
return Flux.from(cntrl);
}
这是我前几天做的一个例子,它将使用 Netifi 的代理创建一个有状态的 RSocket: https://github.com/netifi/netifi-stateful-socket
遗憾的是,您需要在本地构建我们的开发分支才能试用 (https://github.com/netifi/netifi-java) - 如果您不想构建,应该会在本周末发布带有代码的版本它在本地。
我也在研究纯 RSocket 示例,但如果您想了解它是如何实现的,请查看示例中的 StatefulSocket。它应该为您提供如何使用纯 RSocket 处理会话的线索。
关于您关于事务管理器的其他问题 - 您需要将您的事务与正在发出的 Reactive Streams 信号联系起来 - 如果您收到取消,您将回滚的 onError,如果收到 onComplete你会提交交易。 Flux/Mono 中的一些副作用方法应该可以轻松处理。根据你在做什么,你也可以使用 BaseSubscriber,因为它有钩子来处理不同的 Reactive Streams 信号。
谢谢, 罗伯特
恢复连接的示例,即维护服务器上的状态,已登陆 rsocket-java 存储库
https://github.com/rsocket/rsocket-java/commit/d47629147dd1a4d41c7c8d5af3d80838e01d3ba5
恢复整个连接,包括与每个单独通道关联的任何状态等。
有一个 rsocket-cli 项目可以让您尝试一下。启动和停止socat进程,观察客户端和服务器进程。
$ socat -d TCP-LISTEN:5001,fork,reuseaddr TCP:localhost:5000
$ ./rsocket-cli --debug --resume --server -i cli:time tcp://localhost:5000
$ ./rsocket-cli -i client --stream --resume tcp://localhost:5001