Scala Netty 有什么方法可以共享 ReplayingDecoder

Scala Netty is there any way to share a ReplayingDecoder

我希望使用 netty 客户端 bootstrap 打开多个连接,以便解析来自多个来源的消息。这些消息都具有相同的格式,但是,由于需要处理的数据量,我必须 运行 每个连接在单独的线程上(这是假设 netty 在每个客户端通道创建一个线程,我不能这样做'找不到参考 - 如果不是这样,这将如何实现?)。

这是我用来连接数据服务器的代码:

var b = new Bootstrap()
        .group(group)
        .channel(classOf[NioSocketChannel])
        .handler(RawFeedChannelInitializer)


var ch1 = b.clone().connect(host, port).sync().channel();
var ch2 = b.clone().connect(host, port).sync().channel();

初始化程序调用 RawPacketDecoder,它扩展了 ReplayingDecoder,并被定义为 here。 打开单个连接时,代码在没有 @Sharable 的情况下运行良好,但出于我的应用程序的目的,我必须多次连接到同一台服务器。

这导致 运行时间错误 @Sharable annotation is not allowed 指向我的 RawPacketDecoder class。

我不完全确定如何解决这个问题,除了在 scala 中重新实现 ReplayingDecoder 的可实例化 class 作为我直接基于 ByteToMessageDecoder 的解码器。

如有任何帮助,我们将不胜感激。

注意:我使用的是netty 4.0.32 Final

我在 this StockExchange answer 中找到了解决方案。

我的问题是我使用的是基于对象的 ChannelInitializer(单例),ReplayingDecoderByteToMessageDecoder 不可共享。

我的初始值设定项是作为 Scala 对象创建的,因此允许单个实例。将初始化程序更改为 scala class 并为每个 bootstrap 克隆实例化解决了这个问题。我将上面的bootstrap代码修改如下:

var b = new Bootstrap()
    .group(group)
    .channel(classOf[NioSocketChannel])
    //.handler(RawFeedChannelInitializer)

var ch1 = b.clone().handler(new RawFeedChannelInitializer()).connect(host, port).sync().channel();
var ch2 = b.clone().handler(new RawFeedChannelInitializer()).connect(host, port).sync().channel();

我不确定这是否能确保多线程,但它确实允许将数据访问分成多个连接到源服务器。

编辑更新: 在对该主题进行额外研究后,我确定 netty 实际上确实为每个通道创建了一个线程;这是通过在创建每个通道后打印到控制台来验证的:

println("No. of active threads: " + Thread.activeCount());

随着通道的创建和与其各自线程的关联,输出显示了一个递增的数字。

默认情况下,NioEventLoopGroup 使用 2*Num_CPU_cores 个定义的线程 here:

DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
              "io.netty.eventLoopThreads",
               Runtime.getRuntime().availableProcessors() * 2));

可以通过设置

将此值覆盖为其他值
val group = new NioEventLoopGroup(16)

然后使用群组 create/setup bootstrap。