使用 ServerInitializerFactory Netty 4 时消息未到达目标队列

Messages not reaching destination queue when using ServerInitializerFactory Netty 4

我在 grails 中使用 apache camel netty4 并且我声明了 mycustom ServerInitializerFactory 如下

public class MyServerInitializerFactory extends ServerInitializerFactory {
    private int maxLineSize = 1048576;
    NettyConsumer nettyConsumer

    public MimacsServerInitializerFactory() {}

    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline()
        pipeline.addLast("logger", new LoggingHandler(LogLevel.INFO))
        pipeline.addLast("framer", new LengthFieldBasedFrameDecoder(ByteOrder.LITTLE_ENDIAN, maxLineSize, 2, 2, 6, 0, false))
        pipeline.addLast("decoder", new MfuDecoder())
        pipeline.addLast("encoder", new MfuEncoder())
        pipeline.addLast("handler", new MyServerHandler())
    }
}

我有一条路线,我在我的路线构建器中设置如下。

from('netty4:tcp://192.168.254.3:553?serverInitializerFactory=#sif&keepAlive=true&sync=true&allowDefaultCodec=false').to('activemq:queue:Tracking.Queue')

我的 Camel 上下文在 BootStrap.groovy 中设置如下

def serverInitializerFactory = new MyServerInitializerFactory()
SimpleRegistry registry = new SimpleRegistry()
registry.put("sif", serverInitializerFactory)

CamelContext camelContext = new DefaultCamelContext(registry)
camelContext.addComponent("activemq",  activeMQComponent.activeMQComponent("failover:tcp://localhost:61616"))
camelContext.addRoutes new TrackingMessageRoute()
camelContext.start()

当我 运行 我的应用程序时,我的路由启动并且我的成帧器、解码器、处理程序和编码器都被调用,但消息没有到达跟踪。队列和响应不会返回给客户端。

如果我不在 netty url 中使用 serverInitializerFactory 而是使用用户编码器和解码器,我的消息会进入队列,但我无法控制要为每个消息发送的确认我收到的消息类型。似乎 activemq 试图发送自己的响应,但被我的编码器拒绝了。

然后我应该编写代码再次发送还是我遗漏了什么?

您需要为消费者添加一个处理程序,以便它可以被路由,查看单元测试是如何完成的:

我设法解决了这个问题。在我的 channelRead0 方法中。我添加了以下行

Exchange exchange = this.consumer.getEndpoint().createExchange(ctx, msg);

其中ctx是ChannelContextHandler,msg是Message Object,两者都是channelRead0方法的参数

我还添加了以下几行

this.consumer.createUoW(exchange);

在处理代码后,我插入了以下行

this.consumer.doneUoW(exchange);

一切都很顺利。