使用 ServerInitializerFactory Netty 4 时消息未到达目标队列
Messages not reaching destination queue when using ServerInitializerFactory Netty 4
我在 grails 中使用 apache camel netty4 并且我声明了 mycustom ServerInitializerFactory 如下
public class MyServerInitializerFactory extends ServerInitializerFactory {
private int maxLineSize = 1048576;
NettyConsumer nettyConsumer
public MimacsServerInitializerFactory() {}
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline()
pipeline.addLast("logger", new LoggingHandler(LogLevel.INFO))
pipeline.addLast("framer", new LengthFieldBasedFrameDecoder(ByteOrder.LITTLE_ENDIAN, maxLineSize, 2, 2, 6, 0, false))
pipeline.addLast("decoder", new MfuDecoder())
pipeline.addLast("encoder", new MfuEncoder())
pipeline.addLast("handler", new MyServerHandler())
}
}
我有一条路线,我在我的路线构建器中设置如下。
from('netty4:tcp://192.168.254.3:553?serverInitializerFactory=#sif&keepAlive=true&sync=true&allowDefaultCodec=false').to('activemq:queue:Tracking.Queue')
我的 Camel 上下文在 BootStrap.groovy 中设置如下
def serverInitializerFactory = new MyServerInitializerFactory()
SimpleRegistry registry = new SimpleRegistry()
registry.put("sif", serverInitializerFactory)
CamelContext camelContext = new DefaultCamelContext(registry)
camelContext.addComponent("activemq", activeMQComponent.activeMQComponent("failover:tcp://localhost:61616"))
camelContext.addRoutes new TrackingMessageRoute()
camelContext.start()
当我 运行 我的应用程序时,我的路由启动并且我的成帧器、解码器、处理程序和编码器都被调用,但消息没有到达跟踪。队列和响应不会返回给客户端。
如果我不在 netty url 中使用 serverInitializerFactory
而是使用用户编码器和解码器,我的消息会进入队列,但我无法控制要为每个消息发送的确认我收到的消息类型。似乎 activemq
试图发送自己的响应,但被我的编码器拒绝了。
然后我应该编写代码再次发送还是我遗漏了什么?
您需要为消费者添加一个处理程序,以便它可以被路由,查看单元测试是如何完成的:
我设法解决了这个问题。在我的 channelRead0 方法中。我添加了以下行
Exchange exchange = this.consumer.getEndpoint().createExchange(ctx, msg);
其中ctx是ChannelContextHandler,msg是Message Object,两者都是channelRead0方法的参数
我还添加了以下几行
this.consumer.createUoW(exchange);
在处理代码后,我插入了以下行
this.consumer.doneUoW(exchange);
一切都很顺利。
我在 grails 中使用 apache camel netty4 并且我声明了 mycustom ServerInitializerFactory 如下
public class MyServerInitializerFactory extends ServerInitializerFactory {
private int maxLineSize = 1048576;
NettyConsumer nettyConsumer
public MimacsServerInitializerFactory() {}
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline()
pipeline.addLast("logger", new LoggingHandler(LogLevel.INFO))
pipeline.addLast("framer", new LengthFieldBasedFrameDecoder(ByteOrder.LITTLE_ENDIAN, maxLineSize, 2, 2, 6, 0, false))
pipeline.addLast("decoder", new MfuDecoder())
pipeline.addLast("encoder", new MfuEncoder())
pipeline.addLast("handler", new MyServerHandler())
}
}
我有一条路线,我在我的路线构建器中设置如下。
from('netty4:tcp://192.168.254.3:553?serverInitializerFactory=#sif&keepAlive=true&sync=true&allowDefaultCodec=false').to('activemq:queue:Tracking.Queue')
我的 Camel 上下文在 BootStrap.groovy 中设置如下
def serverInitializerFactory = new MyServerInitializerFactory()
SimpleRegistry registry = new SimpleRegistry()
registry.put("sif", serverInitializerFactory)
CamelContext camelContext = new DefaultCamelContext(registry)
camelContext.addComponent("activemq", activeMQComponent.activeMQComponent("failover:tcp://localhost:61616"))
camelContext.addRoutes new TrackingMessageRoute()
camelContext.start()
当我 运行 我的应用程序时,我的路由启动并且我的成帧器、解码器、处理程序和编码器都被调用,但消息没有到达跟踪。队列和响应不会返回给客户端。
如果我不在 netty url 中使用 serverInitializerFactory
而是使用用户编码器和解码器,我的消息会进入队列,但我无法控制要为每个消息发送的确认我收到的消息类型。似乎 activemq
试图发送自己的响应,但被我的编码器拒绝了。
然后我应该编写代码再次发送还是我遗漏了什么?
您需要为消费者添加一个处理程序,以便它可以被路由,查看单元测试是如何完成的:
我设法解决了这个问题。在我的 channelRead0 方法中。我添加了以下行
Exchange exchange = this.consumer.getEndpoint().createExchange(ctx, msg);
其中ctx是ChannelContextHandler,msg是Message Object,两者都是channelRead0方法的参数
我还添加了以下几行
this.consumer.createUoW(exchange);
在处理代码后,我插入了以下行
this.consumer.doneUoW(exchange);
一切都很顺利。