Связывание подключений к БД с потоками исполнителя событий Netty 4

Я разрабатываю сервер Netty, который можно использовать в качестве бэкенда для приложения Android. В моей текущей реализации доступ к БД реализован в логическом обработчике, выполняемом специальным пулом потоков Netty (не потоками ввода-вывода), используя одно соединение с БД на канал Netty, например:

Инициализировать:

EventExecutorGroup logicExecutor = new DefaultEventExecutorGroup(4);
EventLoopGroup acceptGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try
{
    ServerBootstrap b = new ServerBootstrap();
    b.group(acceptGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 50)
        .childOption(ChannelOption.SO_KEEPALIVE, false)
        .childHandler(new ChannelInitializer<SocketChannel>()
        {
        @Override
        public void initChannel(SocketChannel ch) throws Exception
        {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new IdleStateHandler(5*60, 0, 0));
            pipeline.addLast(new ProtobufDelimitedFrameDecoder(65536));
            pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
            pipeline.addLast(new ProtobufDecoder(NetMsg.ClientMsg.getDefaultInstance()));
            pipeline.addLast(new ProtobufEncoder());
            pipeline.addLast(logicExecutor, "logic", new ChannelLogicHandler());
        }
        });

Открытое соединение с БД при активации канала:

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
{
    dbConnection = DriverManager.getConnection(dbConnectParams[0], dbConnectParams[1], dbConnectParams[2]);
    if (dbConnection == null)
    throw new SQLException("Connection to database failed");

    super.channelActive(ctx);
}

... и закрыть соединение на канале неактивно.

Но, насколько я понимаю, Netty связывает каждый поток в своем собственном пуле потоков с каналами для всего их жизненного цикла, поэтому в моем случае использование DefaultEventExecutorGroup(4) для логического обработчика означает, что все каналы будут обслуживаться четырьмя потоками и для любого данный канал будет использовать только один поток из пула потоков. Таким образом, поддержание одного DB-соединения на каждый поток исполнителя является достаточным для обеспечения целостности данных без каких-либо блокировок (с соответствующим уровнем изоляции транзакции). Поэтому мой вопрос: возможно ли связать одно DB-соединение на поток в пуле потоков, чтобы каждое соединение было установлено при запуске потока (или при ассоциировании с ним первого канала), и как это можно реализовать?

2 ответа

Решение

Думаю, я нашел решение сам - каждый ChannelHandlerContext (ctx) имеет свой собственный EventExecutor, который, по сути, является потоком. Поэтому я использую hashmap, чтобы связать соединения с базой данных с исполнителями. В коде:

//Declare hashmap in main server class
private final HashMap<EventExecutor,java.sql.Connection> execConsMap = new HashMap<>(4);

//........................

    public class ChannelLogicHandler extends ChannelInboundHandlerAdapter
{
private java.sql.Connection dbConnection = null; //DB connection saved as private member of logic handler
//........................

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
{
    EventExecutor ex = ctx.executor(); //Get channel executor
    synchronized(execConsMap)
    {
    if (execConsMap.containsKey(ex)) //If already processed get DB connection from hashmap
    {
        dbConnection = execConsMap.get(ex);
    }
    else //Else create new connection and save in hashmap
    {
        java.sql.Connection dbc = DriverManager.getConnection(dbConnectParams[0], dbConnectParams[1], dbConnectParams[2]);
        if (dbc != null)
        {
        execConsMap.put(ex, dbc);
        dbConnection = dbc;
        }
        else
        {
        throw new SQLException("Connection to database failed");
        }
    }
    }

    System.out.println("New client connected");
    super.channelActive(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception
{
    try
    {
    if (!dbConnection.getAutoCommit())
    {
        dbConnection.rollback();
        dbConnection.setAutoCommit(true);
    }
    }
    catch (SQLException e) 
    { 
    e.printStackTrace(); 
    }

    System.out.println("Client disconnected");
    super.channelInactive(ctx);
}

Соединения с БД закрыты при остановке сервера:

logicExecutor.shutdownGracefully().addListener(new GenericFutureListener()
     {
        @Override
        public void operationComplete(Future future) throws Exception
        {
            for (java.sql.Connection conn : execConsMap.values())
            {
                try 
                {
                    if (!conn.getAutoCommit())
                    conn.rollback();
                } catch (SQLException e) 
                { e.printStackTrace(); }

                try 
                {
                    conn.close();
                } catch (SQLException e) 
                { e.printStackTrace(); }
            }
        }
     });

Если вам нужна переменная для каждого потока, вы можете использовать поле ThreadLocal.

Что-то вроде этого:

private static final ThreadLocal<DatabaseConnection> databaseConnection =
     new ThreadLocal<DatabaseConnection>() {
         @Override protected DatabaseConnection initialValue() {
             return DriverManager.getConnection(dbConnectParams[0], dbConnectParams[1], dbConnectParams[2]);
     }
 };

При этом каждый поток будет иметь различное соединение с базой данных. Когда первый вызов метода get будет выполнен, соединение для потока будет инициализировано путем вызова initialValue() метод, который создает соединение с базой данных. Последующие вызовы вернут то же самое предыдущее значение, но вы также можете вручную установить новое значение для этого поля.

Другие вопросы по тегам