Camel 和 Netty:客户端——中间应用——服务器
Camel and Netty: client - intermediary app - server
我正在使用 Camel 和 Netty 设置一个场景,其中客户端连接到服务器,中间有一个应用程序(只是一个名为 Router 的虚拟应用程序)。
SocketClient连接到Router(端口53379),Router连接Server(端口53383)。问题是数据包永远不会到达服务器(但它确实到达了路由器,我用处理器调试了它)。
如果我将 SocketClient 连接到服务器,它工作正常。
感谢任何帮助。
完整的来源
路由器:
public class Router {
public static void main(String[] args) throws Exception {
SimpleRegistry registry = new SimpleRegistry();
registry.put("ByteArrayEncoder", new ByteArrayEncoder());
registry.put("ByteArrayDecoder", new ByteArrayDecoder());
CamelContext context = new DefaultCamelContext(registry);
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("netty4:tcp://localhost:53379?encoders=#ByteArrayEncoder&"
+ "decoders=#ByteArrayDecoder"
+ "&sync=true"
+ "&keepAlive=true")
.to("netty4:tcp://localhost:53383?encoders=#ByteArrayEncoder&"
+ "decoders=#ByteArrayDecoder"
+ "&sync=true"
+ "&keepAlive=true");
}
});
context.start();
while(true) {
Thread.sleep(1000);
}
}
}
服务器:
public class Server {
public static void main(String[] args) throws Exception {
SimpleRegistry registry = new SimpleRegistry();
registry.put("ByteArrayEncoder", new ByteArrayEncoder());
registry.put("ByteArrayDecoder", new ByteArrayDecoder());
CamelContext context = new DefaultCamelContext(registry);
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("netty4:tcp://localhost:53383?encoders=#ByteArrayEncoder&"
+ "decoders=#ByteArrayDecoder"
+ "&sync=true"
+ "&keepAlive=true")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
Message message = exchange.getIn();
System.out.println("from este_stub: " + message.getBody());
exchange.setOut(message);
}
});
}
});
context.start();
while(true) {
Thread.sleep(1000);
}
}
}
客户:
public class SocketClient {
public static void main(String[] args) throws UnknownHostException, IOException {
Socket socket = new Socket("localhost", 53379);
OutputStream simOutStream = socket.getOutputStream();
BufferedInputStream simInStream = new BufferedInputStream(socket.getInputStream());
byte[] arr = new byte[] {1, 2, 3, 4, 5};
simOutStream.write(arr, 0, arr.length);
simOutStream.flush();
byte[] resp = new byte[5];
simInStream.read(resp, 0, resp.length);
for(byte ar : resp)
System.out.print(ar);
socket.close();
}
}
运行 您共享的代码在路由器中出现以下错误 class io.netty.channel.ChannelPipelineException: io.netty.handler.codec.bytes.ByteArrayDecoder is not a @Sharable handler, so can't be added or removed multiple times.
如异常所述,ByteArrayDecoder 以及 ByteArrayEncoder 不是可共享的处理程序,因此是错误的原因。从 Sharable 的 javadoc 中,它清楚地指出
Indicates that the same instance of the annotated ChannelHandler can be added to one or more ChannelPipeline multiple times without a race condition. If this annotation is not specified, you have to create a new handler instance every time you add it to a pipeline because it has unshared state such as member variables.
netty4 的 camel 文档也在 Camel netty4
中说明了这一点
如下:
If your encoders or decoders is not shareable (eg they have the @Shareable class annotation), then your encoder/decoder must implement the org.apache.camel.component.netty.ChannelHandlerFactory interface, and return a new instance in the newChannelHandler method. This is to ensure the encoder/decoder can safely be used. If this is not the case, then the Netty component will log a WARN when
an endpoint is created.
The Netty component offers a org.apache.camel.component.netty.ChannelHandlerFactories factory class, that has a number of commonly used methods.
因此,您的问题已通过使用一些自定义编码器和解码器实现 ChannelHandlerFactory 并覆盖 newChannelHandler 来解决,如下所示:
public class CustomByteArrayDecoder 实现 ChannelHandlerFactory {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
}
@Override
public ChannelHandler newChannelHandler() {
return new ByteArrayDecoder();
}
编码器也是如此。然后在路由器和服务器中进行如下适当的更改:
SimpleRegistry registry = new SimpleRegistry();
registry.put("ByteArrayEncoder", new CustomByteArrayEncoder());
registry.put("ByteArrayDecoder", new CustomByteArrayDecoder());
我正在使用 Camel 和 Netty 设置一个场景,其中客户端连接到服务器,中间有一个应用程序(只是一个名为 Router 的虚拟应用程序)。
SocketClient连接到Router(端口53379),Router连接Server(端口53383)。问题是数据包永远不会到达服务器(但它确实到达了路由器,我用处理器调试了它)。
如果我将 SocketClient 连接到服务器,它工作正常。
感谢任何帮助。
完整的来源
路由器:
public class Router {
public static void main(String[] args) throws Exception {
SimpleRegistry registry = new SimpleRegistry();
registry.put("ByteArrayEncoder", new ByteArrayEncoder());
registry.put("ByteArrayDecoder", new ByteArrayDecoder());
CamelContext context = new DefaultCamelContext(registry);
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("netty4:tcp://localhost:53379?encoders=#ByteArrayEncoder&"
+ "decoders=#ByteArrayDecoder"
+ "&sync=true"
+ "&keepAlive=true")
.to("netty4:tcp://localhost:53383?encoders=#ByteArrayEncoder&"
+ "decoders=#ByteArrayDecoder"
+ "&sync=true"
+ "&keepAlive=true");
}
});
context.start();
while(true) {
Thread.sleep(1000);
}
}
}
服务器:
public class Server {
public static void main(String[] args) throws Exception {
SimpleRegistry registry = new SimpleRegistry();
registry.put("ByteArrayEncoder", new ByteArrayEncoder());
registry.put("ByteArrayDecoder", new ByteArrayDecoder());
CamelContext context = new DefaultCamelContext(registry);
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("netty4:tcp://localhost:53383?encoders=#ByteArrayEncoder&"
+ "decoders=#ByteArrayDecoder"
+ "&sync=true"
+ "&keepAlive=true")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
Message message = exchange.getIn();
System.out.println("from este_stub: " + message.getBody());
exchange.setOut(message);
}
});
}
});
context.start();
while(true) {
Thread.sleep(1000);
}
}
}
客户:
public class SocketClient {
public static void main(String[] args) throws UnknownHostException, IOException {
Socket socket = new Socket("localhost", 53379);
OutputStream simOutStream = socket.getOutputStream();
BufferedInputStream simInStream = new BufferedInputStream(socket.getInputStream());
byte[] arr = new byte[] {1, 2, 3, 4, 5};
simOutStream.write(arr, 0, arr.length);
simOutStream.flush();
byte[] resp = new byte[5];
simInStream.read(resp, 0, resp.length);
for(byte ar : resp)
System.out.print(ar);
socket.close();
}
}
运行 您共享的代码在路由器中出现以下错误 class io.netty.channel.ChannelPipelineException: io.netty.handler.codec.bytes.ByteArrayDecoder is not a @Sharable handler, so can't be added or removed multiple times.
如异常所述,ByteArrayDecoder 以及 ByteArrayEncoder 不是可共享的处理程序,因此是错误的原因。从 Sharable 的 javadoc 中,它清楚地指出
Indicates that the same instance of the annotated ChannelHandler can be added to one or more ChannelPipeline multiple times without a race condition. If this annotation is not specified, you have to create a new handler instance every time you add it to a pipeline because it has unshared state such as member variables.
netty4 的 camel 文档也在 Camel netty4
中说明了这一点如下:
If your encoders or decoders is not shareable (eg they have the @Shareable class annotation), then your encoder/decoder must implement the org.apache.camel.component.netty.ChannelHandlerFactory interface, and return a new instance in the newChannelHandler method. This is to ensure the encoder/decoder can safely be used. If this is not the case, then the Netty component will log a WARN when an endpoint is created. The Netty component offers a org.apache.camel.component.netty.ChannelHandlerFactories factory class, that has a number of commonly used methods.
因此,您的问题已通过使用一些自定义编码器和解码器实现 ChannelHandlerFactory 并覆盖 newChannelHandler 来解决,如下所示:
public class CustomByteArrayDecoder 实现 ChannelHandlerFactory {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
}
@Override
public ChannelHandler newChannelHandler() {
return new ByteArrayDecoder();
}
编码器也是如此。然后在路由器和服务器中进行如下适当的更改:
SimpleRegistry registry = new SimpleRegistry();
registry.put("ByteArrayEncoder", new CustomByteArrayEncoder());
registry.put("ByteArrayDecoder", new CustomByteArrayDecoder());