需要将大型 QueryRunner 结果流式传输到文件,似乎存储在内存中
Need to stream large QueryRunner result to file, seems to be storing in memory
我正在尝试构建一个 Java 应用程序,它可以将任意 SQL SELECT 查询的非常大的结果集流式传输到 JSONL 文件中,特别是通过 SQLServer但想 运行 与任何 JDBC DataSource
。在 Python 中,将 sql 客户端结果视为生成器然后调用 json.dumps()
很容易。然而,在这段代码中,它似乎在写出之前将所有内容都放入内存中,通常会导致堆和垃圾收集异常。我需要这个 运行 的查询非常大,最多可以带回 10GB 的原始数据。执行时间不是主要关注点,只要每次都能正常工作即可。
我试过在每一行之后调用 flush(这很荒谬),这似乎对小数据集有帮助,但对大数据集没有帮助。任何人都可以建议我可以用来轻松完成此操作的策略吗?
在我的 SQL 客户端 class 中,我使用 Apache DbUtils QueryRunner
和 MapListHandler
创建一个 Map
的列表,这是我需要的灵活性(相对于 Java 中需要指定架构和类型的更传统方法):
public List<Map<String, Object>> query(String queryText) {
try {
DbUtils.loadDriver("com.microsoft.sqlserver.jdbc.Driver");
// this function just sets up all the connection properties. Ommitted for clarity
DataSource ds = this.initDataSource();
StatementConfiguration sc = new StatementConfiguration.Builder().fetchSize(10000).build();
QueryRunner queryRunner = new QueryRunner(ds, sc);
MapListHandler handler = new MapListHandler();
return queryRunner.query(queryText, handler);
} catch (Exception e) {
logger.error(e.getMessage());
e.printStackTrace();
return null;
}
}
JsonLOutputWriter
class:
JsonLOutputWriter(String filename) {
GsonBuilder gsonBuilder = new GsonBuilder();
gsonBuilder.serializeNulls();
this.gson = gsonBuilder.create();
try {
this.writer = new PrintWriter(new File(filename), ENCODING);
} catch (FileNotFoundException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
void writeRow(Map row) {
this.writer.println(this.gson.toJson(row));
}
void flush() {
this.writer.flush();
}
主要方法:
JsonLOutputWriter writer = new JsonLOutputWriter(outputFile)
for (Map row : client.query(inputSql)) {
writer.writeRow(row);
}
writer.flush()
基本上,开箱即用 DbUtils
无法做到这一点。我摆脱了 QueryRunner
和 MapListHandler
,因为处理程序创建了一个 ArrayList
。我不是基于拉式,而是基于推式,创建一个非常相似的 MyQueryRunner
,它采用 MyRowHandler
而不是 return 集合只是迭代 ResultSet
并调用我的输出函数。
我敢肯定有更优雅的方法可以做到这一点和 return 某种行缓冲区,但这是我需要的 80/20 并且适用于大型数据集。
RowHandler
public class RowHandler {
private static final RowProcessor ROW_PROCESSOR = new BasicRowProcessor();
private JsonLOutputWriter writer;
public RowHandler(JsonLOutputWriter writer) {
this.writer = writer;
}
int handle(ResultSet rs) throws SQLException {
AtomicInteger counter = new AtomicInteger();
while (rs.next()) {
writer.writeRow(this.handleRow(rs));
counter.getAndIncrement();
}
return counter.intValue();
}
protected Map<String, Object> handleRow(ResultSet rs) throws SQLException {
return this.ROW_PROCESSOR.toMap(rs);
}
}
QueryHandler
class CustomQueryRunner extends AbstractQueryRunner {
private final RowHandler rh;
CustomQueryRunner(DataSource ds, StatementConfiguration stmtConfig, RowHandler rh) {
super(ds, stmtConfig);
this.rh = rh;
}
int query(String sql) throws SQLException {
Connection conn = this.prepareConnection();
return this.query(conn, true, sql);
}
private int query(Connection conn, boolean closeConn, String sql, Object... params)
throws SQLException {
if (conn == null) {
throw new SQLException("Null connection");
}
PreparedStatement stmt = null;
ResultSet rs = null;
int count = 0;
try {
stmt = this.prepareStatement(conn, sql);
this.fillStatement(stmt, params);
rs = this.wrap(stmt.executeQuery());
count = rh.handle(rs);
} catch (SQLException e) {
this.rethrow(e, sql, params);
} finally {
try {
close(rs);
} finally {
close(stmt);
if (closeConn) {
close(conn);
}
}
}
return count;
}
}
我正在尝试构建一个 Java 应用程序,它可以将任意 SQL SELECT 查询的非常大的结果集流式传输到 JSONL 文件中,特别是通过 SQLServer但想 运行 与任何 JDBC DataSource
。在 Python 中,将 sql 客户端结果视为生成器然后调用 json.dumps()
很容易。然而,在这段代码中,它似乎在写出之前将所有内容都放入内存中,通常会导致堆和垃圾收集异常。我需要这个 运行 的查询非常大,最多可以带回 10GB 的原始数据。执行时间不是主要关注点,只要每次都能正常工作即可。
我试过在每一行之后调用 flush(这很荒谬),这似乎对小数据集有帮助,但对大数据集没有帮助。任何人都可以建议我可以用来轻松完成此操作的策略吗?
在我的 SQL 客户端 class 中,我使用 Apache DbUtils QueryRunner
和 MapListHandler
创建一个 Map
的列表,这是我需要的灵活性(相对于 Java 中需要指定架构和类型的更传统方法):
public List<Map<String, Object>> query(String queryText) {
try {
DbUtils.loadDriver("com.microsoft.sqlserver.jdbc.Driver");
// this function just sets up all the connection properties. Ommitted for clarity
DataSource ds = this.initDataSource();
StatementConfiguration sc = new StatementConfiguration.Builder().fetchSize(10000).build();
QueryRunner queryRunner = new QueryRunner(ds, sc);
MapListHandler handler = new MapListHandler();
return queryRunner.query(queryText, handler);
} catch (Exception e) {
logger.error(e.getMessage());
e.printStackTrace();
return null;
}
}
JsonLOutputWriter
class:
JsonLOutputWriter(String filename) {
GsonBuilder gsonBuilder = new GsonBuilder();
gsonBuilder.serializeNulls();
this.gson = gsonBuilder.create();
try {
this.writer = new PrintWriter(new File(filename), ENCODING);
} catch (FileNotFoundException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
void writeRow(Map row) {
this.writer.println(this.gson.toJson(row));
}
void flush() {
this.writer.flush();
}
主要方法:
JsonLOutputWriter writer = new JsonLOutputWriter(outputFile)
for (Map row : client.query(inputSql)) {
writer.writeRow(row);
}
writer.flush()
基本上,开箱即用 DbUtils
无法做到这一点。我摆脱了 QueryRunner
和 MapListHandler
,因为处理程序创建了一个 ArrayList
。我不是基于拉式,而是基于推式,创建一个非常相似的 MyQueryRunner
,它采用 MyRowHandler
而不是 return 集合只是迭代 ResultSet
并调用我的输出函数。
我敢肯定有更优雅的方法可以做到这一点和 return 某种行缓冲区,但这是我需要的 80/20 并且适用于大型数据集。
RowHandler
public class RowHandler {
private static final RowProcessor ROW_PROCESSOR = new BasicRowProcessor();
private JsonLOutputWriter writer;
public RowHandler(JsonLOutputWriter writer) {
this.writer = writer;
}
int handle(ResultSet rs) throws SQLException {
AtomicInteger counter = new AtomicInteger();
while (rs.next()) {
writer.writeRow(this.handleRow(rs));
counter.getAndIncrement();
}
return counter.intValue();
}
protected Map<String, Object> handleRow(ResultSet rs) throws SQLException {
return this.ROW_PROCESSOR.toMap(rs);
}
}
QueryHandler
class CustomQueryRunner extends AbstractQueryRunner {
private final RowHandler rh;
CustomQueryRunner(DataSource ds, StatementConfiguration stmtConfig, RowHandler rh) {
super(ds, stmtConfig);
this.rh = rh;
}
int query(String sql) throws SQLException {
Connection conn = this.prepareConnection();
return this.query(conn, true, sql);
}
private int query(Connection conn, boolean closeConn, String sql, Object... params)
throws SQLException {
if (conn == null) {
throw new SQLException("Null connection");
}
PreparedStatement stmt = null;
ResultSet rs = null;
int count = 0;
try {
stmt = this.prepareStatement(conn, sql);
this.fillStatement(stmt, params);
rs = this.wrap(stmt.executeQuery());
count = rh.handle(rs);
} catch (SQLException e) {
this.rethrow(e, sql, params);
} finally {
try {
close(rs);
} finally {
close(stmt);
if (closeConn) {
close(conn);
}
}
}
return count;
}
}