如何将 netty 解码器中的消息发送到管道中的下一个 handler/decoder?
How to send a message inside a netty decoder to the next handler/decoder in the pipeline?
我的通道管道包含多个解码器,它们都对 TextWebSocketFrame 消息进行操作。现在我的问题是,我必须根据消息的某些内容选择正确的解码器。
本质上,我必须解析消息中的某个字段,然后决定是继续处理消息还是将消息传递给下一个encoder/handler。
大多数人建议在这种情况下使用单个解码器来解码所有消息,但我的问题是一些解码器是动态添加的,将所有逻辑放在一个解码器中会很乱。
目前代码如下所示:
@Override
protected void decode(ChannelHandlerContext ctx, TextWebSocketFrame msg, List<Object> out) throws Exception {
String messageAsJson = msg.text();
JsonObject jsonObject = JSON_PARSER.fromJson(messageAsJson, JsonObject.class);
JsonObject messageHeader = jsonObject.getAsJsonObject(MESSAGE_HEADER_FIELD);
String serviceAsString = messageHeader.get(MESSAGE_SERVICE_FIELD).getAsString();
String inboundTypeAsString = messageHeader.get(MESSAGE_TYPE_FIELD).getAsString();
Service service = JSON_PARSER.fromJson(serviceAsString, Service.class);
InboundType inboundType = JSON_PARSER.fromJson(inboundTypeAsString, InboundType.class);
if (service == Service.STREAMING) {
out.add(decodeQuotesMessage(inboundType, messageAsJson));
} else {
}
}
所以基本上我需要在 else 分支中使用一些逻辑来将消息传递给管道中的下一个处理程序。
我知道,这种方法不是最有效的方法,但我的服务架构有一个慢路径(运行 在不同的线程池上),包括这个逻辑和一个快速路径。因此我可以在这个地方接受一些慢代码。
一般来说,你需要这样的东西:
if (service == Service.STREAMING) {
ctx.pipeline().addLast(new StreamingHandler());
} else {
ctx.pipeline().addLast(new OtherHandler());
}
out.add(decodeQuotesMessage(inboundType, messageAsJson));
ctx.pipeline().remove(this);
后面的逻辑是:
- 你解码了header,你现在知道你需要遵循什么流程了;
- 您根据您的 header;
向管道添加特定的处理程序
- 您将解码后的消息添加到 'out' 列表,因此您说 "send this decoded message to next handler in pipeline, that would be handler defined in step 2 in case current handler is last in pipeline";
- 您从管道中删除当前处理程序以避免处理程序重复,以防您的协议将一次又一次地发送相同的 header。但是,这是特定于您的协议的步骤,可能不是必需的。
这只是一般方法,但是,它实际上取决于您的协议流程。
我的通道管道包含多个解码器,它们都对 TextWebSocketFrame 消息进行操作。现在我的问题是,我必须根据消息的某些内容选择正确的解码器。
本质上,我必须解析消息中的某个字段,然后决定是继续处理消息还是将消息传递给下一个encoder/handler。
大多数人建议在这种情况下使用单个解码器来解码所有消息,但我的问题是一些解码器是动态添加的,将所有逻辑放在一个解码器中会很乱。
目前代码如下所示:
@Override
protected void decode(ChannelHandlerContext ctx, TextWebSocketFrame msg, List<Object> out) throws Exception {
String messageAsJson = msg.text();
JsonObject jsonObject = JSON_PARSER.fromJson(messageAsJson, JsonObject.class);
JsonObject messageHeader = jsonObject.getAsJsonObject(MESSAGE_HEADER_FIELD);
String serviceAsString = messageHeader.get(MESSAGE_SERVICE_FIELD).getAsString();
String inboundTypeAsString = messageHeader.get(MESSAGE_TYPE_FIELD).getAsString();
Service service = JSON_PARSER.fromJson(serviceAsString, Service.class);
InboundType inboundType = JSON_PARSER.fromJson(inboundTypeAsString, InboundType.class);
if (service == Service.STREAMING) {
out.add(decodeQuotesMessage(inboundType, messageAsJson));
} else {
}
}
所以基本上我需要在 else 分支中使用一些逻辑来将消息传递给管道中的下一个处理程序。
我知道,这种方法不是最有效的方法,但我的服务架构有一个慢路径(运行 在不同的线程池上),包括这个逻辑和一个快速路径。因此我可以在这个地方接受一些慢代码。
一般来说,你需要这样的东西:
if (service == Service.STREAMING) {
ctx.pipeline().addLast(new StreamingHandler());
} else {
ctx.pipeline().addLast(new OtherHandler());
}
out.add(decodeQuotesMessage(inboundType, messageAsJson));
ctx.pipeline().remove(this);
后面的逻辑是:
- 你解码了header,你现在知道你需要遵循什么流程了;
- 您根据您的 header; 向管道添加特定的处理程序
- 您将解码后的消息添加到 'out' 列表,因此您说 "send this decoded message to next handler in pipeline, that would be handler defined in step 2 in case current handler is last in pipeline";
- 您从管道中删除当前处理程序以避免处理程序重复,以防您的协议将一次又一次地发送相同的 header。但是,这是特定于您的协议的步骤,可能不是必需的。
这只是一般方法,但是,它实际上取决于您的协议流程。