带有 Netty TCP 服务器的 Project Reactor - 解码消息时 ByteBuff 大小停留在 1024

Project Reactor with Netty TCP Server - ByteBuff Size stuck at 1024 when Decoding Messages

我们正在使用 Spring 引导版本 1.2.8.RELEASE,它与 Reactor 版本 1.1.6.RELEASE 具有 org.projectoreactor.* 的托管依赖关系。

我面临的问题是在我的自定义编解码器 (reactor.io.encoding.Codec) 中,给定的缓冲区 (reactor.io.Buffer) 的上限为 1024 字节,但我的消息超出了该限制。当我尝试解码消息时,它不是完整的消息(只是部分消息)并且我的解码失败,因为它期望完整的消息。

问题一:如何增加 Buffer bytes (reactor.io.Buffer) 以便我的应用函数正常工作?下面的简单示例:

public class StringDecoder implements Function<Buffer, String> {

    // Buffer is limited to 1024 but the message the client sent
    // was 2k
    @Override
    public String apply(Buffer bytes) {
        return bytes.toString();
    }
}

问题二:如何将应用函数(如上)分块?这意味着当 Netty 的缓冲区达到其限制时,我的应用函数可以创建自己的缓冲区(并管理缓冲区),最终我可以解码消息并且 Reactory/Netty 可以将其传递给消费者。

注意:在我的 "main" 方法中,以下用于设置环境。 Netty 服务器在 Windows 下 运行,客户端在 linux 上。这与 TCP 的 Windows 实施有关吗?

// NOTES: ServerSocketOptions sets the max buffer for send and receive to 
// something much larger than 1024. Verified with debugger
TcpServerSpec<String, String> spec = new TcpServerSpec<String, String>(NettyTcpServer.class);
spec.env(env);
spec.listen(port);
spec.dispatcher("sync");
spec.codec(new AgentCodec());
spec.consume(connectionHandler(handler));

TcpServer<String, String> tcp = spec.get();
tcp.start().await();

好的,在调试代码后我发现了问题所在 -- 顺便说一句,我在 Reactor 项目 (tisk tisk) 上找到的任何文档中都找不到这个问题。

apply 函数会随着更多的消息不断被调用。所以如果消息是 2K,apply 将被调用两次。第一次用 1K,第二次用 2K。

注意:如果您不推进缓冲区位置,它将附加到缓冲区。如果你前进了缓冲区位置,它会在再次调用 apply 时被删除。