Scala Netty 有什么方法可以共享 ReplayingDecoder
Scala Netty is there any way to share a ReplayingDecoder
我希望使用 netty 客户端 bootstrap 打开多个连接,以便解析来自多个来源的消息。这些消息都具有相同的格式,但是,由于需要处理的数据量,我必须 运行 每个连接在单独的线程上(这是假设 netty 在每个客户端通道创建一个线程,我不能这样做'找不到参考 - 如果不是这样,这将如何实现?)。
这是我用来连接数据服务器的代码:
var b = new Bootstrap()
.group(group)
.channel(classOf[NioSocketChannel])
.handler(RawFeedChannelInitializer)
var ch1 = b.clone().connect(host, port).sync().channel();
var ch2 = b.clone().connect(host, port).sync().channel();
初始化程序调用 RawPacketDecoder
,它扩展了 ReplayingDecoder,并被定义为 here。
打开单个连接时,代码在没有 @Sharable
的情况下运行良好,但出于我的应用程序的目的,我必须多次连接到同一台服务器。
这导致 运行时间错误 @Sharable annotation is not allowed
指向我的 RawPacketDecoder
class。
我不完全确定如何解决这个问题,除了在 scala 中重新实现 ReplayingDecoder
的可实例化 class 作为我直接基于 ByteToMessageDecoder
的解码器。
如有任何帮助,我们将不胜感激。
注意:我使用的是netty 4.0.32 Final
我在 this StockExchange answer 中找到了解决方案。
我的问题是我使用的是基于对象的 ChannelInitializer(单例),ReplayingDecoder
和 ByteToMessageDecoder
不可共享。
我的初始值设定项是作为 Scala 对象创建的,因此允许单个实例。将初始化程序更改为 scala class 并为每个 bootstrap 克隆实例化解决了这个问题。我将上面的bootstrap代码修改如下:
var b = new Bootstrap()
.group(group)
.channel(classOf[NioSocketChannel])
//.handler(RawFeedChannelInitializer)
var ch1 = b.clone().handler(new RawFeedChannelInitializer()).connect(host, port).sync().channel();
var ch2 = b.clone().handler(new RawFeedChannelInitializer()).connect(host, port).sync().channel();
我不确定这是否能确保多线程,但它确实允许将数据访问分成多个连接到源服务器。
编辑更新: 在对该主题进行额外研究后,我确定 netty 实际上确实为每个通道创建了一个线程;这是通过在创建每个通道后打印到控制台来验证的:
println("No. of active threads: " + Thread.activeCount());
随着通道的创建和与其各自线程的关联,输出显示了一个递增的数字。
默认情况下,NioEventLoopGroup 使用 2*Num_CPU_cores
个定义的线程 here:
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads",
Runtime.getRuntime().availableProcessors() * 2));
可以通过设置
将此值覆盖为其他值
val group = new NioEventLoopGroup(16)
然后使用群组 create/setup bootstrap。
我希望使用 netty 客户端 bootstrap 打开多个连接,以便解析来自多个来源的消息。这些消息都具有相同的格式,但是,由于需要处理的数据量,我必须 运行 每个连接在单独的线程上(这是假设 netty 在每个客户端通道创建一个线程,我不能这样做'找不到参考 - 如果不是这样,这将如何实现?)。
这是我用来连接数据服务器的代码:
var b = new Bootstrap()
.group(group)
.channel(classOf[NioSocketChannel])
.handler(RawFeedChannelInitializer)
var ch1 = b.clone().connect(host, port).sync().channel();
var ch2 = b.clone().connect(host, port).sync().channel();
初始化程序调用 RawPacketDecoder
,它扩展了 ReplayingDecoder,并被定义为 here。
打开单个连接时,代码在没有 @Sharable
的情况下运行良好,但出于我的应用程序的目的,我必须多次连接到同一台服务器。
这导致 运行时间错误 @Sharable annotation is not allowed
指向我的 RawPacketDecoder
class。
我不完全确定如何解决这个问题,除了在 scala 中重新实现 ReplayingDecoder
的可实例化 class 作为我直接基于 ByteToMessageDecoder
的解码器。
如有任何帮助,我们将不胜感激。
注意:我使用的是netty 4.0.32 Final
我在 this StockExchange answer 中找到了解决方案。
我的问题是我使用的是基于对象的 ChannelInitializer(单例),ReplayingDecoder
和 ByteToMessageDecoder
不可共享。
我的初始值设定项是作为 Scala 对象创建的,因此允许单个实例。将初始化程序更改为 scala class 并为每个 bootstrap 克隆实例化解决了这个问题。我将上面的bootstrap代码修改如下:
var b = new Bootstrap()
.group(group)
.channel(classOf[NioSocketChannel])
//.handler(RawFeedChannelInitializer)
var ch1 = b.clone().handler(new RawFeedChannelInitializer()).connect(host, port).sync().channel();
var ch2 = b.clone().handler(new RawFeedChannelInitializer()).connect(host, port).sync().channel();
我不确定这是否能确保多线程,但它确实允许将数据访问分成多个连接到源服务器。
编辑更新: 在对该主题进行额外研究后,我确定 netty 实际上确实为每个通道创建了一个线程;这是通过在创建每个通道后打印到控制台来验证的:
println("No. of active threads: " + Thread.activeCount());
随着通道的创建和与其各自线程的关联,输出显示了一个递增的数字。
默认情况下,NioEventLoopGroup 使用 2*Num_CPU_cores
个定义的线程 here:
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads",
Runtime.getRuntime().availableProcessors() * 2));
可以通过设置
将此值覆盖为其他值val group = new NioEventLoopGroup(16)
然后使用群组 create/setup bootstrap。