Netty Nio 中 promise 的异步更新
Asynchronous update of promise in Netty Nio
我有一个交换信息的服务器和客户端架构。我想return 从服务器连接的频道数。我想 return 使用 promise 将服务器的消息发送给客户端。我的代码是:
public static void callBack () throws Exception{
String host = "localhost";
int port = 8080;
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(), new ClientHandler(promise));
}
});
ChannelFuture f = b.connect(host, port).sync();
//f.channel().closeFuture().sync();
}
finally {
//workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
callBack();
while (true) {
Object msg = promise.get();
System.out.println("The number if the connected clients is not two");
int ret = Integer.parseInt(msg.toString());
if (ret == 2){
break;
}
}
System.out.println("The number if the connected clients is two");
}
当我 运行 一个客户端时,它总是收到消息 The number if the connected clients is not two
并且 returning 号码始终是一个。当我 运行 第二个客户端时,它总是接收到 returning 值二,但是,第一个客户端仍然接收到一个。对于第一个客户的情况,我找不到更新承诺的正确方法。
编辑:
客户端服务器:
public class ClientHandler extends ChannelInboundHandlerAdapter {
public final Promise<Object> promise;
public ClientHandler(Promise<Object> promise) {
this.promise = promise;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
RequestData msg = new RequestData();
msg.setIntValue(123);
msg.setStringValue("all work and no play makes jack a dull boy");
ctx.writeAndFlush(msg);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
promise.trySuccess(msg);
}
}
客户端处理程序的代码将从服务器接收到的消息存储到 promise。
使用Netty框架,Promise and a Future都是一次性写入的对象,这个原理使得它们在多线程环境下更容易使用。
既然一个Promise没有做你想要的,我们需要看看其他技术是否适合你的条件,你的条件基本上归结为:
- 从多个线程读取
- 仅从单个线程写入(因为在 Netty 通道内,read 方法只能同时由 1 个线程执行,除非通道被标记为可共享)
对于这些要求,最合适的匹配是 volatile 变量,因为这对于读取是线程安全的,并且可以由 1 个线程安全地更新而不用担心写入顺序。
要更新您的代码以使用 volatile 变量,需要进行一些修改,因为我们无法轻松地将引用 link 传递给函数内的变量,但我们必须传递一个更新后端的函数变量。
private static volatile int connectedClients = 0;
public static void callBack () throws Exception{
//....
ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(),
new ClientHandler(i -> {connectedClients = i;});
//....
}
public static void main(String[] args) throws Exception {
callBack();
while (true) {
System.out.println("The number if the connected clients is not two");
int ret = connectedClients;
if (ret == 2){
break;
}
}
System.out.println("The number if the connected clients is two");
}
public class ClientHandler extends ChannelInboundHandlerAdapter {
public final IntConsumer update;
public ClientHandler(IntConsumer update) {
this.update = update;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
RequestData msg = new RequestData();
msg.setIntValue(123);
msg.setStringValue("all work and no play makes jack a dull boy");
ctx.writeAndFlush(msg);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
update.accept(Integer.parseInt(msg));
}
}
虽然上面的方法应该可行,但我们很快就会发现 class 主程序中的 while 循环使用了大量 CPU 时间,这可能会影响本地客户端系统的其他部分,幸运的是,如果我们在系统中添加其他部分,即同步,这个问题也是可以解决的。通过将 connectedClients
的初始读取留在同步块之外,我们仍然可以从 "true" 情况下的快速读取中获益,并且在“假”情况下,我们可以安全重要的 CPU 循环,可用于系统的其他部分。
为了解决这个问题,我们在阅读时使用以下步骤:
- 将
connectedClients
的值存储在一个单独的变量中
- 将此变量与目标值进行比较
- 如果为真,则尽早跳出循环
- 如果为 false,则进入同步块
- 开始 while true 循环
- 再次读出变量,因为现在值可能会改变
- 检查条件,如果条件正确则中断
- 如果不是,等待值的变化
并且写的时候如下:
- 同步
- 更新值
- 唤醒等待该值的所有其他线程
可以用如下代码实现:
private static volatile int connectedClients = 0;
private static final Object lock = new Object();
public static void callBack () throws Exception{
//....
ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(),
new ClientHandler(i -> {
synchronized (lock) {
connectedClients = i;
lock.notifyAll();
}
});
//....
}
public static void main(String[] args) throws Exception {
callBack();
int connected = connectedClients;
if (connected != 2) {
System.out.println("The number if the connected clients is not two before locking");
synchronized (lock) {
while (true) {
connected = connectedClients;
if (connected == 2)
break;
System.out.println("The number if the connected clients is not two");
lock.wait();
}
}
}
System.out.println("The number if the connected clients is two: " + connected );
}
服务器端变化
但是,并非所有问题都与客户端有关。
因为您将 link 发布到您的 github 存储库,所以当新人加入时,您永远不会将请求从服务器发送回旧客户端。由于未完成此操作,因此永远不会通知客户端更改,请确保也执行此操作。
我有一个交换信息的服务器和客户端架构。我想return 从服务器连接的频道数。我想 return 使用 promise 将服务器的消息发送给客户端。我的代码是:
public static void callBack () throws Exception{
String host = "localhost";
int port = 8080;
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(), new ClientHandler(promise));
}
});
ChannelFuture f = b.connect(host, port).sync();
//f.channel().closeFuture().sync();
}
finally {
//workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
callBack();
while (true) {
Object msg = promise.get();
System.out.println("The number if the connected clients is not two");
int ret = Integer.parseInt(msg.toString());
if (ret == 2){
break;
}
}
System.out.println("The number if the connected clients is two");
}
当我 运行 一个客户端时,它总是收到消息 The number if the connected clients is not two
并且 returning 号码始终是一个。当我 运行 第二个客户端时,它总是接收到 returning 值二,但是,第一个客户端仍然接收到一个。对于第一个客户的情况,我找不到更新承诺的正确方法。
编辑: 客户端服务器:
public class ClientHandler extends ChannelInboundHandlerAdapter {
public final Promise<Object> promise;
public ClientHandler(Promise<Object> promise) {
this.promise = promise;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
RequestData msg = new RequestData();
msg.setIntValue(123);
msg.setStringValue("all work and no play makes jack a dull boy");
ctx.writeAndFlush(msg);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
promise.trySuccess(msg);
}
}
客户端处理程序的代码将从服务器接收到的消息存储到 promise。
使用Netty框架,Promise and a Future都是一次性写入的对象,这个原理使得它们在多线程环境下更容易使用。
既然一个Promise没有做你想要的,我们需要看看其他技术是否适合你的条件,你的条件基本上归结为:
- 从多个线程读取
- 仅从单个线程写入(因为在 Netty 通道内,read 方法只能同时由 1 个线程执行,除非通道被标记为可共享)
对于这些要求,最合适的匹配是 volatile 变量,因为这对于读取是线程安全的,并且可以由 1 个线程安全地更新而不用担心写入顺序。
要更新您的代码以使用 volatile 变量,需要进行一些修改,因为我们无法轻松地将引用 link 传递给函数内的变量,但我们必须传递一个更新后端的函数变量。
private static volatile int connectedClients = 0;
public static void callBack () throws Exception{
//....
ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(),
new ClientHandler(i -> {connectedClients = i;});
//....
}
public static void main(String[] args) throws Exception {
callBack();
while (true) {
System.out.println("The number if the connected clients is not two");
int ret = connectedClients;
if (ret == 2){
break;
}
}
System.out.println("The number if the connected clients is two");
}
public class ClientHandler extends ChannelInboundHandlerAdapter {
public final IntConsumer update;
public ClientHandler(IntConsumer update) {
this.update = update;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
RequestData msg = new RequestData();
msg.setIntValue(123);
msg.setStringValue("all work and no play makes jack a dull boy");
ctx.writeAndFlush(msg);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
update.accept(Integer.parseInt(msg));
}
}
虽然上面的方法应该可行,但我们很快就会发现 class 主程序中的 while 循环使用了大量 CPU 时间,这可能会影响本地客户端系统的其他部分,幸运的是,如果我们在系统中添加其他部分,即同步,这个问题也是可以解决的。通过将 connectedClients
的初始读取留在同步块之外,我们仍然可以从 "true" 情况下的快速读取中获益,并且在“假”情况下,我们可以安全重要的 CPU 循环,可用于系统的其他部分。
为了解决这个问题,我们在阅读时使用以下步骤:
- 将
connectedClients
的值存储在一个单独的变量中 - 将此变量与目标值进行比较
- 如果为真,则尽早跳出循环
- 如果为 false,则进入同步块
- 开始 while true 循环
- 再次读出变量,因为现在值可能会改变
- 检查条件,如果条件正确则中断
- 如果不是,等待值的变化
并且写的时候如下:
- 同步
- 更新值
- 唤醒等待该值的所有其他线程
可以用如下代码实现:
private static volatile int connectedClients = 0;
private static final Object lock = new Object();
public static void callBack () throws Exception{
//....
ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(),
new ClientHandler(i -> {
synchronized (lock) {
connectedClients = i;
lock.notifyAll();
}
});
//....
}
public static void main(String[] args) throws Exception {
callBack();
int connected = connectedClients;
if (connected != 2) {
System.out.println("The number if the connected clients is not two before locking");
synchronized (lock) {
while (true) {
connected = connectedClients;
if (connected == 2)
break;
System.out.println("The number if the connected clients is not two");
lock.wait();
}
}
}
System.out.println("The number if the connected clients is two: " + connected );
}
服务器端变化
但是,并非所有问题都与客户端有关。
因为您将 link 发布到您的 github 存储库,所以当新人加入时,您永远不会将请求从服务器发送回旧客户端。由于未完成此操作,因此永远不会通知客户端更改,请确保也执行此操作。