如何将巨大的结果集从数据库导出到多个 csv 文件中并即时压缩它们?
How to export huge result set from database into several csv files and zip them on the fly?
我需要创建一个 REST 控制器,它从数据库中提取数据并将其写入 CSV 文件,这些文件最终将被压缩在一起。每个 CSV 文件应恰好包含 10 行。最终,所有 CSV 文件都应压缩到一个 zip 文件中。我希望一切都即时发生,这意味着 - 将文件保存到磁盘上的临时位置不是一种选择。有人可以给我举个例子吗?
我找到了一个非常好的代码,可以将大量行从数据库导出到几个 csv 文件中并进行压缩。
我认为这是一个很好的代码,可以帮助很多开发人员。
我已经测试了解决方案,您可以在以下位置找到整个示例:https://github.com/idaamit/stream-from-db/tree/master
控制器是:
@GetMapping(value = "/employees/{employeeId}/cars") @ResponseStatus(HttpStatus.OK) public ResponseEntity<StreamingResponseBody> getEmployeeCars(@PathVariable int employeeId) {
log.info("Going to export cars for employee {}", employeeId);
String zipFileName = "Cars Of Employee - " + employeeId;
return ResponseEntity.ok()
.header(HttpHeaders.CONTENT_TYPE, "application/zip")
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment;filename=" + zipFileName + ".zip")
.body(
employee.getCars(dataSource, employeeId));
员工class,首先检查我们是否需要准备多个csv :
public class Employee {
public StreamingResponseBody getCars(BasicDataSource dataSource, int employeeId) {
StreamingResponseBody streamingResponseBody = new StreamingResponseBody() {
@Override
public void writeTo(OutputStream outputStream) throws IOException {
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
String sqlQuery = "SELECT [Id], [employeeId], [type], [text1] " +
"FROM Cars " +
"WHERE EmployeeID=? ";
PreparedStatementSetter preparedStatementSetter = new PreparedStatementSetter() {
public void setValues(PreparedStatement preparedStatement) throws SQLException {
preparedStatement.setInt(1, employeeId);
}
};
StreamingZipResultSetExtractor zipExtractor = new StreamingZipResultSetExtractor(outputStream, employeeId, isMoreThanOneFile(jdbcTemplate, employeeId));
Integer numberOfInteractionsSent = jdbcTemplate.query(sqlQuery, preparedStatementSetter, zipExtractor);
}
};
return streamingResponseBody;
}
private boolean isMoreThanOneFile(JdbcTemplate jdbcTemplate, int employeeId) {
Integer numberOfCars = getCount(jdbcTemplate, employeeId);
return numberOfCars >= StreamingZipResultSetExtractor.MAX_ROWS_IN_CSV;
}
private Integer getCount(JdbcTemplate jdbcTemplate, int employeeId) {
String sqlQuery = "SELECT count([Id]) " +
"FROM Cars " +
"WHERE EmployeeID=? ";
return jdbcTemplate.queryForObject(sqlQuery, new Object[] { employeeId }, Integer.class);
}
}
这个class StreamingZipResultSetExtractor负责将csv流数据拆分成几个文件并压缩。
@Slf4j
public class StreamingZipResultSetExtractor implements ResultSetExtractor<Integer> {
private final static int CHUNK_SIZE = 100000;
public final static int MAX_ROWS_IN_CSV = 10;
private OutputStream outputStream;
private int employeeId;
private StreamingCsvResultSetExtractor streamingCsvResultSetExtractor;
private boolean isInteractionCountExceedsLimit;
private int fileCount = 0;
public StreamingZipResultSetExtractor(OutputStream outputStream, int employeeId, boolean isInteractionCountExceedsLimit) {
this.outputStream = outputStream;
this.employeeId = employeeId;
this.streamingCsvResultSetExtractor = new StreamingCsvResultSetExtractor(employeeId);
this.isInteractionCountExceedsLimit = isInteractionCountExceedsLimit;
}
@Override
@SneakyThrows
public Integer extractData(ResultSet resultSet) throws DataAccessException {
log.info("Creating thread to extract data as zip file for employeeId {}", employeeId);
int lineCount = 1; //+1 for header row
try (PipedOutputStream internalOutputStream = streamingCsvResultSetExtractor.extractData(resultSet);
PipedInputStream InputStream = new PipedInputStream(internalOutputStream);
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(InputStream))) {
String currentLine;
String header = bufferedReader.readLine() + "\n";
try (ZipOutputStream zipOutputStream = new ZipOutputStream(outputStream)) {
createFile(employeeId, zipOutputStream, header);
while ((currentLine = bufferedReader.readLine()) != null) {
if (lineCount % MAX_ROWS_IN_CSV == 0) {
zipOutputStream.closeEntry();
createFile(employeeId, zipOutputStream, header);
lineCount++;
}
lineCount++;
currentLine += "\n";
zipOutputStream.write(currentLine.getBytes());
if (lineCount % CHUNK_SIZE == 0) {
zipOutputStream.flush();
}
}
}
} catch (IOException e) {
log.error("Task {} could not zip search results", employeeId, e);
}
log.info("Finished zipping all lines to {} file\s - total of {} lines of data for task {}", fileCount, lineCount - fileCount, employeeId);
return lineCount;
}
private void createFile(int employeeId, ZipOutputStream zipOutputStream, String header) {
String fileName = "Cars for Employee - " + employeeId;
if (isInteractionCountExceedsLimit) {
fileCount++;
fileName += " Part " + fileCount;
}
try {
zipOutputStream.putNextEntry(new ZipEntry(fileName + ".csv"));
zipOutputStream.write(header.getBytes());
} catch (IOException e) {
log.error("Could not create new zip entry for task {} ", employeeId, e);
}
}
}
class StreamingCsvResultSetExtractor 负责将结果集中的数据传输到csv 文件中。处理 csv 单元格中有问题的特殊字符集还有很多工作要做。
@Slf4j
public class StreamingCsvResultSetExtractor implements ResultSetExtractor<PipedOutputStream> {
private final static int CHUNK_SIZE = 100000;
private PipedOutputStream pipedOutputStream;
private final int employeeId;
public StreamingCsvResultSetExtractor(int employeeId) {
this.employeeId = employeeId;
}
@SneakyThrows
@Override
public PipedOutputStream extractData(ResultSet resultSet) throws DataAccessException {
log.info("Creating thread to extract data as csv and save to file for task {}", employeeId);
this.pipedOutputStream = new PipedOutputStream();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
prepareCsv(resultSet);
});
return pipedOutputStream;
}
@SneakyThrows
private Integer prepareCsv(ResultSet resultSet) {
int interactionsSent = 1;
log.info("starting to extract data to csv lines");
streamHeaders(resultSet.getMetaData());
StringBuilder csvRowBuilder = new StringBuilder();
try {
int columnCount = resultSet.getMetaData().getColumnCount();
while (resultSet.next()) {
for (int i = 1; i < columnCount + 1; i++) {
if(resultSet.getString(i) != null && resultSet.getString(i).contains(",")){
String strToAppend = "\"" + resultSet.getString(i) + "\"";
csvRowBuilder.append(strToAppend);
} else {
csvRowBuilder.append(resultSet.getString(i));
}
csvRowBuilder.append(",");
}
int rowLength = csvRowBuilder.length();
csvRowBuilder.replace(rowLength - 1, rowLength, "\n");
pipedOutputStream.write(csvRowBuilder.toString().getBytes());
interactionsSent++;
csvRowBuilder.setLength(0);
if (interactionsSent % CHUNK_SIZE == 0) {
pipedOutputStream.flush();
}
}
} finally {
pipedOutputStream.flush();
pipedOutputStream.close();
}
log.debug("Created all csv lines for Task {} - total of {} rows", employeeId, interactionsSent);
return interactionsSent;
}
@SneakyThrows
private void streamHeaders(ResultSetMetaData resultSetMetaData) {
StringBuilder headersCsvBuilder = new StringBuilder();
for (int i = 1; i < resultSetMetaData.getColumnCount() + 1; i++) {
headersCsvBuilder.append(resultSetMetaData.getColumnLabel(i)).append(",");
}
int rowLength = headersCsvBuilder.length();
headersCsvBuilder.replace(rowLength - 1, rowLength, "\n");
pipedOutputStream.write(headersCsvBuilder.toString().getBytes());
}
}
为了测试这个,你需要执行http://localhost:8080/stream-demo/employees/3/cars
我需要创建一个 REST 控制器,它从数据库中提取数据并将其写入 CSV 文件,这些文件最终将被压缩在一起。每个 CSV 文件应恰好包含 10 行。最终,所有 CSV 文件都应压缩到一个 zip 文件中。我希望一切都即时发生,这意味着 - 将文件保存到磁盘上的临时位置不是一种选择。有人可以给我举个例子吗?
我找到了一个非常好的代码,可以将大量行从数据库导出到几个 csv 文件中并进行压缩。 我认为这是一个很好的代码,可以帮助很多开发人员。 我已经测试了解决方案,您可以在以下位置找到整个示例:https://github.com/idaamit/stream-from-db/tree/master 控制器是:
@GetMapping(value = "/employees/{employeeId}/cars") @ResponseStatus(HttpStatus.OK) public ResponseEntity<StreamingResponseBody> getEmployeeCars(@PathVariable int employeeId) {
log.info("Going to export cars for employee {}", employeeId);
String zipFileName = "Cars Of Employee - " + employeeId;
return ResponseEntity.ok()
.header(HttpHeaders.CONTENT_TYPE, "application/zip")
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment;filename=" + zipFileName + ".zip")
.body(
employee.getCars(dataSource, employeeId));
员工class,首先检查我们是否需要准备多个csv :
public class Employee {
public StreamingResponseBody getCars(BasicDataSource dataSource, int employeeId) {
StreamingResponseBody streamingResponseBody = new StreamingResponseBody() {
@Override
public void writeTo(OutputStream outputStream) throws IOException {
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
String sqlQuery = "SELECT [Id], [employeeId], [type], [text1] " +
"FROM Cars " +
"WHERE EmployeeID=? ";
PreparedStatementSetter preparedStatementSetter = new PreparedStatementSetter() {
public void setValues(PreparedStatement preparedStatement) throws SQLException {
preparedStatement.setInt(1, employeeId);
}
};
StreamingZipResultSetExtractor zipExtractor = new StreamingZipResultSetExtractor(outputStream, employeeId, isMoreThanOneFile(jdbcTemplate, employeeId));
Integer numberOfInteractionsSent = jdbcTemplate.query(sqlQuery, preparedStatementSetter, zipExtractor);
}
};
return streamingResponseBody;
}
private boolean isMoreThanOneFile(JdbcTemplate jdbcTemplate, int employeeId) {
Integer numberOfCars = getCount(jdbcTemplate, employeeId);
return numberOfCars >= StreamingZipResultSetExtractor.MAX_ROWS_IN_CSV;
}
private Integer getCount(JdbcTemplate jdbcTemplate, int employeeId) {
String sqlQuery = "SELECT count([Id]) " +
"FROM Cars " +
"WHERE EmployeeID=? ";
return jdbcTemplate.queryForObject(sqlQuery, new Object[] { employeeId }, Integer.class);
}
}
这个class StreamingZipResultSetExtractor负责将csv流数据拆分成几个文件并压缩。
@Slf4j
public class StreamingZipResultSetExtractor implements ResultSetExtractor<Integer> {
private final static int CHUNK_SIZE = 100000;
public final static int MAX_ROWS_IN_CSV = 10;
private OutputStream outputStream;
private int employeeId;
private StreamingCsvResultSetExtractor streamingCsvResultSetExtractor;
private boolean isInteractionCountExceedsLimit;
private int fileCount = 0;
public StreamingZipResultSetExtractor(OutputStream outputStream, int employeeId, boolean isInteractionCountExceedsLimit) {
this.outputStream = outputStream;
this.employeeId = employeeId;
this.streamingCsvResultSetExtractor = new StreamingCsvResultSetExtractor(employeeId);
this.isInteractionCountExceedsLimit = isInteractionCountExceedsLimit;
}
@Override
@SneakyThrows
public Integer extractData(ResultSet resultSet) throws DataAccessException {
log.info("Creating thread to extract data as zip file for employeeId {}", employeeId);
int lineCount = 1; //+1 for header row
try (PipedOutputStream internalOutputStream = streamingCsvResultSetExtractor.extractData(resultSet);
PipedInputStream InputStream = new PipedInputStream(internalOutputStream);
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(InputStream))) {
String currentLine;
String header = bufferedReader.readLine() + "\n";
try (ZipOutputStream zipOutputStream = new ZipOutputStream(outputStream)) {
createFile(employeeId, zipOutputStream, header);
while ((currentLine = bufferedReader.readLine()) != null) {
if (lineCount % MAX_ROWS_IN_CSV == 0) {
zipOutputStream.closeEntry();
createFile(employeeId, zipOutputStream, header);
lineCount++;
}
lineCount++;
currentLine += "\n";
zipOutputStream.write(currentLine.getBytes());
if (lineCount % CHUNK_SIZE == 0) {
zipOutputStream.flush();
}
}
}
} catch (IOException e) {
log.error("Task {} could not zip search results", employeeId, e);
}
log.info("Finished zipping all lines to {} file\s - total of {} lines of data for task {}", fileCount, lineCount - fileCount, employeeId);
return lineCount;
}
private void createFile(int employeeId, ZipOutputStream zipOutputStream, String header) {
String fileName = "Cars for Employee - " + employeeId;
if (isInteractionCountExceedsLimit) {
fileCount++;
fileName += " Part " + fileCount;
}
try {
zipOutputStream.putNextEntry(new ZipEntry(fileName + ".csv"));
zipOutputStream.write(header.getBytes());
} catch (IOException e) {
log.error("Could not create new zip entry for task {} ", employeeId, e);
}
}
}
class StreamingCsvResultSetExtractor 负责将结果集中的数据传输到csv 文件中。处理 csv 单元格中有问题的特殊字符集还有很多工作要做。
@Slf4j
public class StreamingCsvResultSetExtractor implements ResultSetExtractor<PipedOutputStream> {
private final static int CHUNK_SIZE = 100000;
private PipedOutputStream pipedOutputStream;
private final int employeeId;
public StreamingCsvResultSetExtractor(int employeeId) {
this.employeeId = employeeId;
}
@SneakyThrows
@Override
public PipedOutputStream extractData(ResultSet resultSet) throws DataAccessException {
log.info("Creating thread to extract data as csv and save to file for task {}", employeeId);
this.pipedOutputStream = new PipedOutputStream();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
prepareCsv(resultSet);
});
return pipedOutputStream;
}
@SneakyThrows
private Integer prepareCsv(ResultSet resultSet) {
int interactionsSent = 1;
log.info("starting to extract data to csv lines");
streamHeaders(resultSet.getMetaData());
StringBuilder csvRowBuilder = new StringBuilder();
try {
int columnCount = resultSet.getMetaData().getColumnCount();
while (resultSet.next()) {
for (int i = 1; i < columnCount + 1; i++) {
if(resultSet.getString(i) != null && resultSet.getString(i).contains(",")){
String strToAppend = "\"" + resultSet.getString(i) + "\"";
csvRowBuilder.append(strToAppend);
} else {
csvRowBuilder.append(resultSet.getString(i));
}
csvRowBuilder.append(",");
}
int rowLength = csvRowBuilder.length();
csvRowBuilder.replace(rowLength - 1, rowLength, "\n");
pipedOutputStream.write(csvRowBuilder.toString().getBytes());
interactionsSent++;
csvRowBuilder.setLength(0);
if (interactionsSent % CHUNK_SIZE == 0) {
pipedOutputStream.flush();
}
}
} finally {
pipedOutputStream.flush();
pipedOutputStream.close();
}
log.debug("Created all csv lines for Task {} - total of {} rows", employeeId, interactionsSent);
return interactionsSent;
}
@SneakyThrows
private void streamHeaders(ResultSetMetaData resultSetMetaData) {
StringBuilder headersCsvBuilder = new StringBuilder();
for (int i = 1; i < resultSetMetaData.getColumnCount() + 1; i++) {
headersCsvBuilder.append(resultSetMetaData.getColumnLabel(i)).append(",");
}
int rowLength = headersCsvBuilder.length();
headersCsvBuilder.replace(rowLength - 1, rowLength, "\n");
pipedOutputStream.write(headersCsvBuilder.toString().getBytes());
}
}
为了测试这个,你需要执行http://localhost:8080/stream-demo/employees/3/cars