Netty NIO:读取收到的消息
Netty NIO: Read received messages
我正在 Java 中使用 Netty NIO 开发客户端和服务器通信系统。可以找到我的代码 in the following repository。目前我有一台服务器和两个客户端,我正在将信息从服务器发送到客户端,反之亦然。
我想弄清楚的是,当我从第一个客户端接收到服务器的消息时,我如何将该消息发送到第二个客户端(以及从客户端 2 到客户端 1 的相反方向)。如何向特定客户端发送消息?
我注意到我的问题是由于我尝试从服务器发送消息的方式引起的。我在 serverHandler 中的代码如下:
for (Channel ch : channels1) {
responseData.setIntValue(channels1.size());
remoteAddr.add(ch.remoteAddress().toString());
future = ch.writeAndFlush(responseData);
//future.addListener(ChannelFutureListener.CLOSE);
System.out.println("the requested data from the clients are: "+requestData);
responseData1.setStringValue(requestData.toString());
future = ch.writeAndFlush(responseData1);
System.out.println(future);
}
默认情况下,我会发送有关连接数的消息,但当我从客户端 1 或 2 收到消息时,我也想将其发送回 2 和 1。所以我想在两者之间执行通信这两个组件。如何从服务器发送到特定客户端?我不确定如何将消息发送回客户端。
一般方法
让我们描述解决问题的方法。
在服务器端接收数据时,使用通道的远程地址(java.net.SocketAddress Channel.remoteAddress()
method)来识别客户端。
可以使用如下映射来完成此类识别:Map<SocketAddress, Client>
,其中 Client
class 或接口应包含适当的客户端连接(通道)关联上下文,包括其 Channel
。确保地图保持最新:适当处理 «客户端连接» 和 «客户端断开» 事件。
识别客户端后,您可以使用客户端连接(通道)映射将适当的消息发送给客户端(当前发送客户端除外)。
此外,我想建议您使用 Netty 找到一个很好的聊天应用程序实现并看一看。
Netty 特定的解决方案
让我们考虑服务器端实现,特别是 ProcessingHandler
class.
的实现
它已经通过将活动频道表示为频道组来管理它们:
static final ChannelGroup channels1 =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
使频道组保持最新
当前的实现处理 «channel becomes active» 事件以使频道组保持最新:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
channels1.add(ctx.channel());
// ...
}
但这只是一半:还需要对称地处理 «channel becomes inactive» 事件。实施应如下所示:
@Override
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
channels1.remove(ctx.channel());
}
广播:将收到的消息发送到所有频道,当前频道除外
要实现所需的行为,只需通过引入适当的检查来更新实现,如下所示:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// ...
for (Channel ch : channels1) {
// Does `ch` represent the channel of the current sending client?
if (ch.equals(ctx.channel())) {
// Skip.
continue;
}
// Send the message to the `ch` channel.
// ...
}
// ...
}
发送和接收字符串问题
Currently,ResponseData
class 的功能不存在(未实现)。
需要进行以下 草稿 更改才能使客户端和服务器都正常工作。
ResponseData
class:getStringValue
和toString
方法应该更正:
String getStringValue() {
return this.strValue;
}
@Override
public String toString() {
return intValue + ";" + strValue;
}
ResponseDataEncoder
class:应该使用字符串值:
private final Charset charset = Charset.forName("UTF-8");
@Override
protected void encode(final ChannelHandlerContext ctx, final ResponseData msg, final ByteBuf out) throws Exception {
out.writeInt(msg.getIntValue());
out.writeInt(msg.getStringValue().length());
out.writeCharSequence(msg.getStringValue(), charset);
}
ResponseDataDecoder
class:应该使用字符串值:
private final Charset charset = Charset.forName("UTF-8");
@Override
protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) throws Exception {
ResponseData data = new ResponseData();
data.setIntValue(in.readInt());
int strLen = in.readInt();
data.setStringValue(in.readCharSequence(strLen, charset).toString());
out.add(data);
}
ClientHandler
class:应该正确接收和处理消息:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
final ResponseData responseData = (ResponseData) msg;
System.out.println("The message sent from the server " + responseData);
update.accept(responseData.getIntValue());
}
其他参考文献
- «SecureChat ‐ an TLS-based chat server, derived from the Telnet example», Netty Documentation. In particular, the implementation of the
SecureChatServerHandler
class.
- «Netty in Action», Norman Maurer, Marvin Allen Wolfthal (ISBN-13: 978-1617291470),«第 3 部分 — 网络协议»,«12.2 我们的示例 WebSocket 应用程序» 子章。涵盖«基于浏览器的聊天应用程序»的实现。
我正在 Java 中使用 Netty NIO 开发客户端和服务器通信系统。可以找到我的代码 in the following repository。目前我有一台服务器和两个客户端,我正在将信息从服务器发送到客户端,反之亦然。
我想弄清楚的是,当我从第一个客户端接收到服务器的消息时,我如何将该消息发送到第二个客户端(以及从客户端 2 到客户端 1 的相反方向)。如何向特定客户端发送消息?
我注意到我的问题是由于我尝试从服务器发送消息的方式引起的。我在 serverHandler 中的代码如下:
for (Channel ch : channels1) {
responseData.setIntValue(channels1.size());
remoteAddr.add(ch.remoteAddress().toString());
future = ch.writeAndFlush(responseData);
//future.addListener(ChannelFutureListener.CLOSE);
System.out.println("the requested data from the clients are: "+requestData);
responseData1.setStringValue(requestData.toString());
future = ch.writeAndFlush(responseData1);
System.out.println(future);
}
默认情况下,我会发送有关连接数的消息,但当我从客户端 1 或 2 收到消息时,我也想将其发送回 2 和 1。所以我想在两者之间执行通信这两个组件。如何从服务器发送到特定客户端?我不确定如何将消息发送回客户端。
一般方法
让我们描述解决问题的方法。
在服务器端接收数据时,使用通道的远程地址(java.net.SocketAddress Channel.remoteAddress()
method)来识别客户端。
可以使用如下映射来完成此类识别:Map<SocketAddress, Client>
,其中 Client
class 或接口应包含适当的客户端连接(通道)关联上下文,包括其 Channel
。确保地图保持最新:适当处理 «客户端连接» 和 «客户端断开» 事件。
识别客户端后,您可以使用客户端连接(通道)映射将适当的消息发送给客户端(当前发送客户端除外)。
此外,我想建议您使用 Netty 找到一个很好的聊天应用程序实现并看一看。
Netty 特定的解决方案
让我们考虑服务器端实现,特别是 ProcessingHandler
class.
它已经通过将活动频道表示为频道组来管理它们:
static final ChannelGroup channels1 =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
使频道组保持最新
当前的实现处理 «channel becomes active» 事件以使频道组保持最新:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
channels1.add(ctx.channel());
// ...
}
但这只是一半:还需要对称地处理 «channel becomes inactive» 事件。实施应如下所示:
@Override
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
channels1.remove(ctx.channel());
}
广播:将收到的消息发送到所有频道,当前频道除外
要实现所需的行为,只需通过引入适当的检查来更新实现,如下所示:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// ...
for (Channel ch : channels1) {
// Does `ch` represent the channel of the current sending client?
if (ch.equals(ctx.channel())) {
// Skip.
continue;
}
// Send the message to the `ch` channel.
// ...
}
// ...
}
发送和接收字符串问题
Currently,ResponseData
class 的功能不存在(未实现)。
需要进行以下 草稿 更改才能使客户端和服务器都正常工作。
ResponseData
class:getStringValue
和toString
方法应该更正:String getStringValue() { return this.strValue; } @Override public String toString() { return intValue + ";" + strValue; }
ResponseDataEncoder
class:应该使用字符串值:private final Charset charset = Charset.forName("UTF-8"); @Override protected void encode(final ChannelHandlerContext ctx, final ResponseData msg, final ByteBuf out) throws Exception { out.writeInt(msg.getIntValue()); out.writeInt(msg.getStringValue().length()); out.writeCharSequence(msg.getStringValue(), charset); }
ResponseDataDecoder
class:应该使用字符串值:private final Charset charset = Charset.forName("UTF-8"); @Override protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) throws Exception { ResponseData data = new ResponseData(); data.setIntValue(in.readInt()); int strLen = in.readInt(); data.setStringValue(in.readCharSequence(strLen, charset).toString()); out.add(data); }
ClientHandler
class:应该正确接收和处理消息:@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { final ResponseData responseData = (ResponseData) msg; System.out.println("The message sent from the server " + responseData); update.accept(responseData.getIntValue()); }
其他参考文献
- «SecureChat ‐ an TLS-based chat server, derived from the Telnet example», Netty Documentation. In particular, the implementation of the
SecureChatServerHandler
class. - «Netty in Action», Norman Maurer, Marvin Allen Wolfthal (ISBN-13: 978-1617291470),«第 3 部分 — 网络协议»,«12.2 我们的示例 WebSocket 应用程序» 子章。涵盖«基于浏览器的聊天应用程序»的实现。