带有 Netty TCP 服务器的 Project Reactor - 解码消息时 ByteBuff 大小停留在 1024
Project Reactor with Netty TCP Server - ByteBuff Size stuck at 1024 when Decoding Messages
我们正在使用 Spring 引导版本 1.2.8.RELEASE
,它与 Reactor 版本 1.1.6.RELEASE
具有 org.projectoreactor.*
的托管依赖关系。
我面临的问题是在我的自定义编解码器 (reactor.io.encoding.Codec
) 中,给定的缓冲区 (reactor.io.Buffer
) 的上限为 1024 字节,但我的消息超出了该限制。当我尝试解码消息时,它不是完整的消息(只是部分消息)并且我的解码失败,因为它期望完整的消息。
问题一:如何增加 Buffer bytes
(reactor.io.Buffer
) 以便我的应用函数正常工作?下面的简单示例:
public class StringDecoder implements Function<Buffer, String> {
// Buffer is limited to 1024 but the message the client sent
// was 2k
@Override
public String apply(Buffer bytes) {
return bytes.toString();
}
}
问题二:如何将应用函数(如上)分块?这意味着当 Netty 的缓冲区达到其限制时,我的应用函数可以创建自己的缓冲区(并管理缓冲区),最终我可以解码消息并且 Reactory/Netty 可以将其传递给消费者。
注意:在我的 "main" 方法中,以下用于设置环境。 Netty 服务器在 Windows 下 运行,客户端在 linux 上。这与 TCP 的 Windows 实施有关吗?
// NOTES: ServerSocketOptions sets the max buffer for send and receive to
// something much larger than 1024. Verified with debugger
TcpServerSpec<String, String> spec = new TcpServerSpec<String, String>(NettyTcpServer.class);
spec.env(env);
spec.listen(port);
spec.dispatcher("sync");
spec.codec(new AgentCodec());
spec.consume(connectionHandler(handler));
TcpServer<String, String> tcp = spec.get();
tcp.start().await();
好的,在调试代码后我发现了问题所在 -- 顺便说一句,我在 Reactor 项目 (tisk tisk) 上找到的任何文档中都找不到这个问题。
apply 函数会随着更多的消息不断被调用。所以如果消息是 2K,apply 将被调用两次。第一次用 1K,第二次用 2K。
注意:如果您不推进缓冲区位置,它将附加到缓冲区。如果你前进了缓冲区位置,它会在再次调用 apply 时被删除。
我们正在使用 Spring 引导版本 1.2.8.RELEASE
,它与 Reactor 版本 1.1.6.RELEASE
具有 org.projectoreactor.*
的托管依赖关系。
我面临的问题是在我的自定义编解码器 (reactor.io.encoding.Codec
) 中,给定的缓冲区 (reactor.io.Buffer
) 的上限为 1024 字节,但我的消息超出了该限制。当我尝试解码消息时,它不是完整的消息(只是部分消息)并且我的解码失败,因为它期望完整的消息。
问题一:如何增加 Buffer bytes
(reactor.io.Buffer
) 以便我的应用函数正常工作?下面的简单示例:
public class StringDecoder implements Function<Buffer, String> {
// Buffer is limited to 1024 but the message the client sent
// was 2k
@Override
public String apply(Buffer bytes) {
return bytes.toString();
}
}
问题二:如何将应用函数(如上)分块?这意味着当 Netty 的缓冲区达到其限制时,我的应用函数可以创建自己的缓冲区(并管理缓冲区),最终我可以解码消息并且 Reactory/Netty 可以将其传递给消费者。
注意:在我的 "main" 方法中,以下用于设置环境。 Netty 服务器在 Windows 下 运行,客户端在 linux 上。这与 TCP 的 Windows 实施有关吗?
// NOTES: ServerSocketOptions sets the max buffer for send and receive to
// something much larger than 1024. Verified with debugger
TcpServerSpec<String, String> spec = new TcpServerSpec<String, String>(NettyTcpServer.class);
spec.env(env);
spec.listen(port);
spec.dispatcher("sync");
spec.codec(new AgentCodec());
spec.consume(connectionHandler(handler));
TcpServer<String, String> tcp = spec.get();
tcp.start().await();
好的,在调试代码后我发现了问题所在 -- 顺便说一句,我在 Reactor 项目 (tisk tisk) 上找到的任何文档中都找不到这个问题。
apply 函数会随着更多的消息不断被调用。所以如果消息是 2K,apply 将被调用两次。第一次用 1K,第二次用 2K。
注意:如果您不推进缓冲区位置,它将附加到缓冲区。如果你前进了缓冲区位置,它会在再次调用 apply 时被删除。