如何在netty中执行长任务?

How to execute long task in netty?

我对 netty.io 比较陌生。我正在尝试编写一个在客户端第一次连接时插入超过 200 行的应用程序。问题是一次只插入 10~ 行,看起来 netty 只是打断了我的代码。我不知道如何为我的任务设置最长执行时间。

这就是我启动服务器的方式:

public void start() throws InterruptedException{
        EventLoopGroup group = new NioEventLoopGroup();
        EventExecutorGroup sqlExecutorGroup = new DefaultEventExecutorGroup(10);
        ServerBootstrap server = new ServerBootstrap();
        server.group(group);
        server.channel(NioServerSocketChannel.class);
        server.localAddress(new InetSocketAddress(port));
        server.childHandler(new ChannelInitializer<SocketChannel>(){
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new HttpServerCodec());
                    ch.pipeline().addLast(new ChunkedWriteHandler());
                    ch.pipeline().addLast(new HttpObjectAggregator(65536));
                    ch.pipeline().addLast(new WebSocketServerProtocolHandler("/"));
                    ch.pipeline().addLast(new WebSocketFrameToByteBuf());
                    ch.pipeline().addLast(new ProtobufEncoder());
                    ch.pipeline().addLast(new ProtobufDecoder(Chat.MsgWrapper.getDefaultInstance()));
                    ch.pipeline().addLast(new MsgUserLogin(prop.getProperty("sessions_prefix")));
           }
        });
        ChannelFuture f = server.bind().sync();
        f.channel().closeFuture().sync();   
    }

这是我正在执行任务的处理程序中的函数:

private void register(ChannelHandlerContext ctx, Chat.User u){
        ctx.executor().execute(new Runnable() {
            @Override
            public void run() {
                ctx.channel().attr(Main.KeyUser).set(u);
                ConversationUserMember member =  ConversationPool.getOrCacheConversationUserMember(u);
                ConversationPool.UpdateConversations(member);
                Main.loggedUsers.add(ctx.channel());
                member.newChannel(ctx.channel());
                returnConversations(member,ctx); //this function is the one getting cut by netty.
            //    fireNewUser(u);
            }
        });
    }

经过更多的研究,我发现了我所做的所有错误:

  1. 我没有向我的处理程序添加 EventExecutorGroup。
  2. 执行插入的代码没有正确释放数据库连接,这是主要问题。
  3. 在处理程序中,我使用 ctx.executor().execute 来执行我的代码,在将 EventExecutorGroup 添加到处理程序之后,这不是必需的。

工作代码如下所示:

public void start() throws InterruptedException{
        EventLoopGroup group = new NioEventLoopGroup();
        EventExecutorGroup sqlExecutorGroup = new DefaultEventExecutorGroup(10);
        ServerBootstrap server = new ServerBootstrap();
        server.group(group);
        server.channel(NioServerSocketChannel.class);
        server.localAddress(new InetSocketAddress(port));
        server.childHandler(new ChannelInitializer<SocketChannel>(){
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new HttpServerCodec());
                    ch.pipeline().addLast(new ChunkedWriteHandler());
                    ch.pipeline().addLast(new HttpObjectAggregator(65536));
                    ch.pipeline().addLast(new WebSocketServerProtocolHandler("/"));
                    ch.pipeline().addLast(new WebSocketFrameToByteBuf());
                    ch.pipeline().addLast(new ProtobufEncoder());
                    ch.pipeline().addLast(new ProtobufDecoder(Chat.MsgWrapper.getDefaultInstance()));
                    ch.pipeline().addLast(sqlExecutorGroup,new MsgUserLogin(prop.getProperty("sessions_prefix")));
           }
        });
        ChannelFuture f = server.bind().sync();
        f.channel().closeFuture().sync();   
    }

和:

private void register(ChannelHandlerContext ctx, Chat.User u){
                ctx.channel().attr(Main.KeyUser).set(u);
                ConversationUserMember member =                          ConversationPool.getOrCacheConversationUserMember(u);
                ConversationPool.UpdateConversations(member);
                Main.loggedUsers.add(ctx.channel());
                member.newChannel(ctx.channel());
                returnConversations(member,ctx); //this function is the one getting cut by netty.
            //    fireNewUser(u);
    }