将数据库连接与 Netty 4 事件执行器的线程相关联

Associating DB connections with threads of Netty 4 event executor

我正在开发一个 Netty 服务器,可用作 Android 应用程序的后端。在我当前的实现中,在逻辑处理程序中实现对 DB 的访问,由特殊的 Netty 线程池(不是 I/O 线程)执行,每个 Netty 通道使用一个 DB 连接,如下所示:

初始化:

EventExecutorGroup logicExecutor = new DefaultEventExecutorGroup(4);
EventLoopGroup acceptGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try
{
    ServerBootstrap b = new ServerBootstrap();
    b.group(acceptGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 50)
        .childOption(ChannelOption.SO_KEEPALIVE, false)
        .childHandler(new ChannelInitializer<SocketChannel>()
        {
        @Override
        public void initChannel(SocketChannel ch) throws Exception
        {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new IdleStateHandler(5*60, 0, 0));
            pipeline.addLast(new ProtobufDelimitedFrameDecoder(65536));
            pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
            pipeline.addLast(new ProtobufDecoder(NetMsg.ClientMsg.getDefaultInstance()));
            pipeline.addLast(new ProtobufEncoder());
            pipeline.addLast(logicExecutor, "logic", new ChannelLogicHandler());
        }
        });

在通道激活时打开与数据库的连接:

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
{
    dbConnection = DriverManager.getConnection(dbConnectParams[0], dbConnectParams[1], dbConnectParams[2]);
    if (dbConnection == null)
    throw new SQLException("Connection to database failed");

    super.channelActive(ctx);
}

...并在不活动的通道上关闭连接。

但据我了解,Netty 将其自己的线程池中的每个线程与其整个生命周期的通道相关联,因此在我的例子中,将 DefaultEventExecutorGroup(4) 用于逻辑处理程序意味着所有通道都将提供服务四个线程,对于任何给定的通道,将仅使用线程池中的一个线程。因此,为每个执行程序线程维护一个数据库连接足以在没有任何锁的情况下提供数据完整性(具有适当的事务隔离级别)。 所以我的问题是可以在线程池中为每个线程关联一个数据库连接,以便每个连接都在线程启动时建立(或在第一个通道与其关联时),以及如何实现?

如果您希望每个线程都有一个变量,您可以使用 ThreadLocal 字段。

像这样:

private static final ThreadLocal<DatabaseConnection> databaseConnection =
     new ThreadLocal<DatabaseConnection>() {
         @Override protected DatabaseConnection initialValue() {
             return DriverManager.getConnection(dbConnectParams[0], dbConnectParams[1], dbConnectParams[2]);
     }
 };

这样做,每个线程都会有一个不同的数据库连接。当第一次调用 get 方法完成时,线程的连接将通过调用创建数据库连接的 initialValue() 方法来初始化。后续调用将 return 与之前的值相同,但您也可以手动为此字段设置新值。

我想我自己找到了解决方案——每个 ChannelHandlerContext (ctx) 都有自己的 EventExecutor,本质上就是线程。所以我使用 hashmap 将数据库连接与执行程序相关联。在代码中:

//Declare hashmap in main server class
private final HashMap<EventExecutor,java.sql.Connection> execConsMap = new HashMap<>(4);

//............................

    public class ChannelLogicHandler extends ChannelInboundHandlerAdapter
{
private java.sql.Connection dbConnection = null; //DB connection saved as private member of logic handler
//........................

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
{
    EventExecutor ex = ctx.executor(); //Get channel executor
    synchronized(execConsMap)
    {
    if (execConsMap.containsKey(ex)) //If already processed get DB connection from hashmap
    {
        dbConnection = execConsMap.get(ex);
    }
    else //Else create new connection and save in hashmap
    {
        java.sql.Connection dbc = DriverManager.getConnection(dbConnectParams[0], dbConnectParams[1], dbConnectParams[2]);
        if (dbc != null)
        {
        execConsMap.put(ex, dbc);
        dbConnection = dbc;
        }
        else
        {
        throw new SQLException("Connection to database failed");
        }
    }
    }

    System.out.println("New client connected");
    super.channelActive(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception
{
    try
    {
    if (!dbConnection.getAutoCommit())
    {
        dbConnection.rollback();
        dbConnection.setAutoCommit(true);
    }
    }
    catch (SQLException e) 
    { 
    e.printStackTrace(); 
    }

    System.out.println("Client disconnected");
    super.channelInactive(ctx);
}

数据库连接在服务器停止时关闭:

logicExecutor.shutdownGracefully().addListener(new GenericFutureListener()
     {
        @Override
        public void operationComplete(Future future) throws Exception
        {
            for (java.sql.Connection conn : execConsMap.values())
            {
                try 
                {
                    if (!conn.getAutoCommit())
                    conn.rollback();
                } catch (SQLException e) 
                { e.printStackTrace(); }

                try 
                {
                    conn.close();
                } catch (SQLException e) 
                { e.printStackTrace(); }
            }
        }
     });