Java 并发调用的数据库查询和 C3P0
Java DB Queries and C3P0 for Concurrent calls
我正在努力思考 C3P0 和数据库调用。我最初在 SQLite 上有一个 运行 的程序,现在我试图允许并发测试 MariaDB 上的查询。有一些项目我没有掌握。 SQLite 的最初设计是有一个生产者线程将查询放入队列,以及一个消费者线程从队列中获取数据并向数据库发出数据库查询。
我想知道这个单线程是否能够发出并发请求(因为它只是一个线程)。
其次,我有一个问题,显然没有返回连接,或者它似乎在大约 18 个查询后停止。队列中仍有项目,但程序只是停止并等待新连接的尝试。
我的主数据库调用线程class:
public class DBRunnable extends DBExtend implements Runnable
{
/**
* Call the query builder instance
*/
protected QueryBuilder qb = QueryBuilder.getInstance();
/**
* Call the point type converter instance
*/
protected PointTypeConv pv = PointTypeConv.getInstance();
/**
* Singleton object
*/
private static DBRunnable db = null;
/**
* Constructor
*/
public DBRunnable()
{
}
/**
* Main thread functionality
*/
@Override
public void run()
{
try
{
while (true)
{
long startTime = 0;
QueryRequest msg = null;
try
{
// Pull any existing query requests off the queue, if not, wait for one.
msg = (QueryRequest) DBMigrationTool.dbProcQueue.take();
} catch (Exception e)
{
errorLog.error("Unable to fetch message from message processing queue.");
}
// Good practice to create a new result set instead of reusing
ResultSet rs = null;
Statement stmt = null;
// Fetch the query and the request out of the QueryRequest object
String query = msg.getQuery();
// Make sure the query request isn't empty, if it is, there is no point in sending it to the DB
try (Connection conn = cpds.getConnection())
{
// Execute the given query and fetch the result from it
stmt = conn.createStatement();
startTime = System.currentTimeMillis();
stmt.setQueryTimeout(1800);
System.out.println(query);
stmt.execute(query);
rs = stmt.getResultSet();
if (rs != null)
{
try
{
int count = 0;
while (rs.next())
{
count++;
}
System.out.println("Query Complete: " + (System.currentTimeMillis() - startTime) + "ms. Result count: " + count);
if (msg.getFlag() == 1)
{
DBMigrationTool.flag = 0;
}
} catch (Exception e)
{
errorLog.error("Failed to process database result set.");
}
}
conn.close();
} catch (SQLException e)
{
errorLog.error("Query Error: " + msg.getQuery());
errorLog.error("Failed to issue database command: " + e);
} finally
{
if (rs != null)
{
try
{
rs.close();
} catch (SQLException e)
{
errorLog.error("Failed to close JDBC result set.");
}
}
if (stmt != null)
{
try
{
stmt.close();
} catch (SQLException e)
{
errorLog.error("Failed to close JDBC statement.");
}
}
}
}
} finally
{
closeDB();
DBMigrationTool.dbProcHandle.cancel(true);
}
}
我的接口数据库 class 包含连接信息:
public class DBExtend
{
/**
* Standard timeout
*/
public static final int DB_TIMEOUT = 30;
/**
* Standard error logger for log4j2
*/
protected static Logger errorLog = LogManager.getLogger(DBExtend.class.getName());
/**
* Call to the query builder instance
*/
private static QueryBuilder qb = QueryBuilder.getInstance();
/**
* DB connection
*/
protected static ComboPooledDataSource cpds;
/**
* Constructor
*/
public DBExtend()
{
}
/**
* startDB is an initialization function used to open a database connection
*
* @param dbPath - System path to the database file
*/
public void startDB(String dbPath)
{
cpds = new ComboPooledDataSource();
cpds.setJdbcUrl("jdbc:sqlite:" + dbPath);
cpds.setMinPoolSize(1);
cpds.setTestConnectionOnCheckout(true);
cpds.setAcquireIncrement(5);
cpds.setMaxPoolSize(20);
errorLog.info("Connection to SQLite has been established.");
}
public void startMariaDB(String tableName)
{
cpds = new ComboPooledDataSource();
cpds.setJdbcUrl("jdbc:mariadb://localhost:3306/" + tableName);
cpds.setUser("root");
cpds.setPassword("joy");
cpds.setMinPoolSize(1);
cpds.setTestConnectionOnCheckout(true);
cpds.setAcquireIncrement(5);
cpds.setMaxPoolSize(20);
errorLog.info("Connection to MariaDB has been established.");
}
/**
* Close DB is to close a database instance
*/
public void closeDB()
{
try
{
cpds.close();
errorLog.info("Connection to SQLite has been closed.");
} catch (SQLException e)
{
errorLog.error(e.getMessage());
} finally
{
try
{
if (cpds.getConnection() != null)
{
cpds.getConnection().close();
}
if (cpds != null)
{
cpds.close();
}
} catch (SQLException ex)
{
errorLog.error(ex.getMessage());
}
}
}
}
JDBC 驱动程序必须是线程安全的,它抽象了实现细节。请注意,尽管驱动程序是线程安全的,但从多个线程同时使用同一个连接对象仍然不是一个好主意。
至于你的实际问题,你使用C3P0的数据源是完全错误的。由连接池支持的数据源使用 getConnection()
方法为用户提供来自该池的连接。当您关闭该连接时,该连接将返回到池中。
这意味着您从池中获取一个连接,完成您的工作,然后将其关闭,以便它返回到池中以供您的应用程序的其他部分使用。
这意味着DBRunnable
中的以下代码是错误的:
if (cpds.getConnection().isValid(DB_TIMEOUT))
您从池中获得一个连接,然后立即泄漏它(它没有返回到池中),因为您没有引用它。请注意,大多数连接池(有时是可选的)在返回连接之前会进行连接验证,因此没有必要对其进行测试。
与您的 DBExtend
class 类似,这是错误的:
在selectMariaDB
中:
cpds.getConnection().setCatalog(DBName);
在这里你从池中获得一个连接,并且永远不会关闭它,这意味着你有 'leaked' 这个连接。设置目录没有效果,因为此连接不会被重用。在这种情况下设置目录应该是连接池配置的一部分。
在closeDB
中:
cpds.getConnection().close();
这从池中获取连接并立即关闭它(将其返回到池中)。那没有实际意义。
我正在努力思考 C3P0 和数据库调用。我最初在 SQLite 上有一个 运行 的程序,现在我试图允许并发测试 MariaDB 上的查询。有一些项目我没有掌握。 SQLite 的最初设计是有一个生产者线程将查询放入队列,以及一个消费者线程从队列中获取数据并向数据库发出数据库查询。
我想知道这个单线程是否能够发出并发请求(因为它只是一个线程)。
其次,我有一个问题,显然没有返回连接,或者它似乎在大约 18 个查询后停止。队列中仍有项目,但程序只是停止并等待新连接的尝试。
我的主数据库调用线程class:
public class DBRunnable extends DBExtend implements Runnable
{
/**
* Call the query builder instance
*/
protected QueryBuilder qb = QueryBuilder.getInstance();
/**
* Call the point type converter instance
*/
protected PointTypeConv pv = PointTypeConv.getInstance();
/**
* Singleton object
*/
private static DBRunnable db = null;
/**
* Constructor
*/
public DBRunnable()
{
}
/**
* Main thread functionality
*/
@Override
public void run()
{
try
{
while (true)
{
long startTime = 0;
QueryRequest msg = null;
try
{
// Pull any existing query requests off the queue, if not, wait for one.
msg = (QueryRequest) DBMigrationTool.dbProcQueue.take();
} catch (Exception e)
{
errorLog.error("Unable to fetch message from message processing queue.");
}
// Good practice to create a new result set instead of reusing
ResultSet rs = null;
Statement stmt = null;
// Fetch the query and the request out of the QueryRequest object
String query = msg.getQuery();
// Make sure the query request isn't empty, if it is, there is no point in sending it to the DB
try (Connection conn = cpds.getConnection())
{
// Execute the given query and fetch the result from it
stmt = conn.createStatement();
startTime = System.currentTimeMillis();
stmt.setQueryTimeout(1800);
System.out.println(query);
stmt.execute(query);
rs = stmt.getResultSet();
if (rs != null)
{
try
{
int count = 0;
while (rs.next())
{
count++;
}
System.out.println("Query Complete: " + (System.currentTimeMillis() - startTime) + "ms. Result count: " + count);
if (msg.getFlag() == 1)
{
DBMigrationTool.flag = 0;
}
} catch (Exception e)
{
errorLog.error("Failed to process database result set.");
}
}
conn.close();
} catch (SQLException e)
{
errorLog.error("Query Error: " + msg.getQuery());
errorLog.error("Failed to issue database command: " + e);
} finally
{
if (rs != null)
{
try
{
rs.close();
} catch (SQLException e)
{
errorLog.error("Failed to close JDBC result set.");
}
}
if (stmt != null)
{
try
{
stmt.close();
} catch (SQLException e)
{
errorLog.error("Failed to close JDBC statement.");
}
}
}
}
} finally
{
closeDB();
DBMigrationTool.dbProcHandle.cancel(true);
}
}
我的接口数据库 class 包含连接信息:
public class DBExtend
{
/**
* Standard timeout
*/
public static final int DB_TIMEOUT = 30;
/**
* Standard error logger for log4j2
*/
protected static Logger errorLog = LogManager.getLogger(DBExtend.class.getName());
/**
* Call to the query builder instance
*/
private static QueryBuilder qb = QueryBuilder.getInstance();
/**
* DB connection
*/
protected static ComboPooledDataSource cpds;
/**
* Constructor
*/
public DBExtend()
{
}
/**
* startDB is an initialization function used to open a database connection
*
* @param dbPath - System path to the database file
*/
public void startDB(String dbPath)
{
cpds = new ComboPooledDataSource();
cpds.setJdbcUrl("jdbc:sqlite:" + dbPath);
cpds.setMinPoolSize(1);
cpds.setTestConnectionOnCheckout(true);
cpds.setAcquireIncrement(5);
cpds.setMaxPoolSize(20);
errorLog.info("Connection to SQLite has been established.");
}
public void startMariaDB(String tableName)
{
cpds = new ComboPooledDataSource();
cpds.setJdbcUrl("jdbc:mariadb://localhost:3306/" + tableName);
cpds.setUser("root");
cpds.setPassword("joy");
cpds.setMinPoolSize(1);
cpds.setTestConnectionOnCheckout(true);
cpds.setAcquireIncrement(5);
cpds.setMaxPoolSize(20);
errorLog.info("Connection to MariaDB has been established.");
}
/**
* Close DB is to close a database instance
*/
public void closeDB()
{
try
{
cpds.close();
errorLog.info("Connection to SQLite has been closed.");
} catch (SQLException e)
{
errorLog.error(e.getMessage());
} finally
{
try
{
if (cpds.getConnection() != null)
{
cpds.getConnection().close();
}
if (cpds != null)
{
cpds.close();
}
} catch (SQLException ex)
{
errorLog.error(ex.getMessage());
}
}
}
}
JDBC 驱动程序必须是线程安全的,它抽象了实现细节。请注意,尽管驱动程序是线程安全的,但从多个线程同时使用同一个连接对象仍然不是一个好主意。
至于你的实际问题,你使用C3P0的数据源是完全错误的。由连接池支持的数据源使用 getConnection()
方法为用户提供来自该池的连接。当您关闭该连接时,该连接将返回到池中。
这意味着您从池中获取一个连接,完成您的工作,然后将其关闭,以便它返回到池中以供您的应用程序的其他部分使用。
这意味着DBRunnable
中的以下代码是错误的:
if (cpds.getConnection().isValid(DB_TIMEOUT))
您从池中获得一个连接,然后立即泄漏它(它没有返回到池中),因为您没有引用它。请注意,大多数连接池(有时是可选的)在返回连接之前会进行连接验证,因此没有必要对其进行测试。
与您的 DBExtend
class 类似,这是错误的:
在selectMariaDB
中:
cpds.getConnection().setCatalog(DBName);
在这里你从池中获得一个连接,并且永远不会关闭它,这意味着你有 'leaked' 这个连接。设置目录没有效果,因为此连接不会被重用。在这种情况下设置目录应该是连接池配置的一部分。
在closeDB
中:
cpds.getConnection().close();
这从池中获取连接并立即关闭它(将其返回到池中)。那没有实际意义。