Camel 和 Netty:客户端——中间应用——服务器

Camel and Netty: client - intermediary app - server

我正在使用 Camel 和 Netty 设置一个场景,其中客户端连接到服务器,中间有一个应用程序(只是一个名为 Router 的虚拟应用程序)。

SocketClient连接到Router(端口53379),Router连接Server(端口53383)。问题是数据包永远不会到达服务器(但它确实到达了路由器,我用处理器调试了它)。

如果我将 SocketClient 连接到服务器,它工作正常。

感谢任何帮助。

完整的来源

路由器:

public class Router {
    public static void main(String[] args) throws Exception {
        SimpleRegistry registry = new SimpleRegistry();
        registry.put("ByteArrayEncoder", new ByteArrayEncoder());
        registry.put("ByteArrayDecoder", new ByteArrayDecoder());
        
        CamelContext context = new DefaultCamelContext(registry);
        
        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("netty4:tcp://localhost:53379?encoders=#ByteArrayEncoder&"
                        + "decoders=#ByteArrayDecoder"
                        + "&sync=true"
                        + "&keepAlive=true")
                .to("netty4:tcp://localhost:53383?encoders=#ByteArrayEncoder&"
                        + "decoders=#ByteArrayDecoder"
                        + "&sync=true"
                        + "&keepAlive=true");               
            }
        });

        context.start();
        while(true) {
            Thread.sleep(1000);
        }
    }
}

服务器:

public class Server {
    public static void main(String[] args) throws Exception {
        SimpleRegistry registry = new SimpleRegistry();
        registry.put("ByteArrayEncoder", new ByteArrayEncoder());
        registry.put("ByteArrayDecoder", new ByteArrayDecoder());
        
        CamelContext context = new DefaultCamelContext(registry);
        
        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("netty4:tcp://localhost:53383?encoders=#ByteArrayEncoder&"
                        + "decoders=#ByteArrayDecoder"
                        + "&sync=true"
                        + "&keepAlive=true")
                .process(new Processor() {
                    public void process(Exchange exchange) throws Exception {
                        Message message = exchange.getIn();
                        System.out.println("from este_stub: " + message.getBody());
                        exchange.setOut(message);
                    }
                });
            }
        });

        context.start();
        while(true) {
            Thread.sleep(1000);
        }
    }
}

客户:

public class SocketClient {
    public static void main(String[] args) throws UnknownHostException, IOException {
        Socket socket = new Socket("localhost", 53379);
        OutputStream simOutStream = socket.getOutputStream();
        BufferedInputStream simInStream = new BufferedInputStream(socket.getInputStream());        
        
        byte[] arr = new byte[] {1, 2, 3, 4, 5};
        simOutStream.write(arr, 0, arr.length);
        simOutStream.flush();
        
        
        byte[] resp = new byte[5];
        simInStream.read(resp, 0, resp.length);
        for(byte ar : resp)
            System.out.print(ar);
        
        socket.close();
    }
}

运行 您共享的代码在路由器中出现以下错误 class io.netty.channel.ChannelPipelineException: io.netty.handler.codec.bytes.ByteArrayDecoder is not a @Sharable handler, so can't be added or removed multiple times.

如异常所述,ByteArrayDecoder 以及 ByteArrayEncoder 不是可共享的处理程序,因此是错误的原因。从 Sharable 的 javadoc 中,它清楚地指出

Indicates that the same instance of the annotated ChannelHandler can be added to one or more ChannelPipeline multiple times without a race condition. If this annotation is not specified, you have to create a new handler instance every time you add it to a pipeline because it has unshared state such as member variables.

netty4 的 camel 文档也在 Camel netty4

中说明了这一点

如下:

If your encoders or decoders is not shareable (eg they have the @Shareable class annotation), then your encoder/decoder must implement the org.apache.camel.component.netty.ChannelHandlerFactory interface, and return a new instance in the newChannelHandler method. This is to ensure the encoder/decoder can safely be used. If this is not the case, then the Netty component will log a WARN when an endpoint is created. The Netty component offers a org.apache.camel.component.netty.ChannelHandlerFactories factory class, that has a number of commonly used methods.

因此,您的问题已通过使用一些自定义编码器和解码器实现 ChannelHandlerFactory 并覆盖 newChannelHandler 来解决,如下所示:

public class CustomByteArrayDecoder 实现 ChannelHandlerFactory {

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

}

@Override
public ChannelHandler newChannelHandler() {
    return new ByteArrayDecoder();
}

编码器也是如此。然后在路由器和服务器中进行如下适当的更改:

SimpleRegistry registry = new SimpleRegistry();
registry.put("ByteArrayEncoder", new CustomByteArrayEncoder());
registry.put("ByteArrayDecoder", new CustomByteArrayDecoder());