我需要创建一个REST控制器,它从数据库中提取数据并将其写入CSV文件,这些文件最终将被压缩在一起。每个CSV文件应该包含正好10行。最终,所有CSV文件都应该压缩到一个zip文件中。我希望所有事情都在运行中发生,这意味着将文件保存到磁盘上的临时位置不是一个选择。有人能给我举个例子吗?
smdncfj31#
我发现了一个非常好的代码从数据库导出大量的行到几个csv文件和压缩它。我认为这是一个很好的代码,可以帮助很多开发人员。我已经测试了该解决方案,您可以在以下位置找到完整的示例:https://github.com/idaamit/stream-from-db/tree/master控制器为:
@GetMapping(value = "/employees/{employeeId}/cars") @ResponseStatus(HttpStatus.OK) public ResponseEntity<StreamingResponseBody> getEmployeeCars(@PathVariable int employeeId) { log.info("Going to export cars for employee {}", employeeId); String zipFileName = "Cars Of Employee - " + employeeId; return ResponseEntity.ok() .header(HttpHeaders.CONTENT_TYPE, "application/zip") .header(HttpHeaders.CONTENT_DISPOSITION, "attachment;filename=" + zipFileName + ".zip") .body( employee.getCars(dataSource, employeeId));
employee类,首先检查我们是否需要准备多个csv:
public class Employee { public StreamingResponseBody getCars(BasicDataSource dataSource, int employeeId) { StreamingResponseBody streamingResponseBody = new StreamingResponseBody() { @Override public void writeTo(OutputStream outputStream) throws IOException { JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource); String sqlQuery = "SELECT [Id], [employeeId], [type], [text1] " + "FROM Cars " + "WHERE EmployeeID=? "; PreparedStatementSetter preparedStatementSetter = new PreparedStatementSetter() { public void setValues(PreparedStatement preparedStatement) throws SQLException { preparedStatement.setInt(1, employeeId); } }; StreamingZipResultSetExtractor zipExtractor = new StreamingZipResultSetExtractor(outputStream, employeeId, isMoreThanOneFile(jdbcTemplate, employeeId)); Integer numberOfInteractionsSent = jdbcTemplate.query(sqlQuery, preparedStatementSetter, zipExtractor); } }; return streamingResponseBody; } private boolean isMoreThanOneFile(JdbcTemplate jdbcTemplate, int employeeId) { Integer numberOfCars = getCount(jdbcTemplate, employeeId); return numberOfCars >= StreamingZipResultSetExtractor.MAX_ROWS_IN_CSV; } private Integer getCount(JdbcTemplate jdbcTemplate, int employeeId) { String sqlQuery = "SELECT count([Id]) " + "FROM Cars " + "WHERE EmployeeID=? "; return jdbcTemplate.queryForObject(sqlQuery, new Object[] { employeeId }, Integer.class); }
}这个类StreamingZipResultSetExtractor负责将csv流数据拆分到几个文件中并对其进行压缩。
@Slf4j public class StreamingZipResultSetExtractor implements ResultSetExtractor<Integer> { private final static int CHUNK_SIZE = 100000; public final static int MAX_ROWS_IN_CSV = 10; private OutputStream outputStream; private int employeeId; private StreamingCsvResultSetExtractor streamingCsvResultSetExtractor; private boolean isInteractionCountExceedsLimit; private int fileCount = 0; public StreamingZipResultSetExtractor(OutputStream outputStream, int employeeId, boolean isInteractionCountExceedsLimit) { this.outputStream = outputStream; this.employeeId = employeeId; this.streamingCsvResultSetExtractor = new StreamingCsvResultSetExtractor(employeeId); this.isInteractionCountExceedsLimit = isInteractionCountExceedsLimit; } @Override @SneakyThrows public Integer extractData(ResultSet resultSet) throws DataAccessException { log.info("Creating thread to extract data as zip file for employeeId {}", employeeId); int lineCount = 1; //+1 for header row try (PipedOutputStream internalOutputStream = streamingCsvResultSetExtractor.extractData(resultSet); PipedInputStream InputStream = new PipedInputStream(internalOutputStream); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(InputStream))) { String currentLine; String header = bufferedReader.readLine() + "\n"; try (ZipOutputStream zipOutputStream = new ZipOutputStream(outputStream)) { createFile(employeeId, zipOutputStream, header); while ((currentLine = bufferedReader.readLine()) != null) { if (lineCount % MAX_ROWS_IN_CSV == 0) { zipOutputStream.closeEntry(); createFile(employeeId, zipOutputStream, header); lineCount++; } lineCount++; currentLine += "\n"; zipOutputStream.write(currentLine.getBytes()); if (lineCount % CHUNK_SIZE == 0) { zipOutputStream.flush(); } } } } catch (IOException e) { log.error("Task {} could not zip search results", employeeId, e); } log.info("Finished zipping all lines to {} file\\s - total of {} lines of data for task {}", fileCount, lineCount - fileCount, employeeId); return lineCount; } private void createFile(int employeeId, ZipOutputStream zipOutputStream, String header) { String fileName = "Cars for Employee - " + employeeId; if (isInteractionCountExceedsLimit) { fileCount++; fileName += " Part " + fileCount; } try { zipOutputStream.putNextEntry(new ZipEntry(fileName + ".csv")); zipOutputStream.write(header.getBytes()); } catch (IOException e) { log.error("Could not create new zip entry for task {} ", employeeId, e); } }
}StreamingCsvResultSetExtractor类负责将结果集中的数据传输到csv文件中。还有更多的工作要做,以处理特殊的字符集,这是有问题的csv单元格。
@Slf4j public class StreamingCsvResultSetExtractor implements ResultSetExtractor<PipedOutputStream> { private final static int CHUNK_SIZE = 100000; private PipedOutputStream pipedOutputStream; private final int employeeId; public StreamingCsvResultSetExtractor(int employeeId) { this.employeeId = employeeId; } @SneakyThrows @Override public PipedOutputStream extractData(ResultSet resultSet) throws DataAccessException { log.info("Creating thread to extract data as csv and save to file for task {}", employeeId); this.pipedOutputStream = new PipedOutputStream(); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.submit(() -> { prepareCsv(resultSet); }); return pipedOutputStream; } @SneakyThrows private Integer prepareCsv(ResultSet resultSet) { int interactionsSent = 1; log.info("starting to extract data to csv lines"); streamHeaders(resultSet.getMetaData()); StringBuilder csvRowBuilder = new StringBuilder(); try { int columnCount = resultSet.getMetaData().getColumnCount(); while (resultSet.next()) { for (int i = 1; i < columnCount + 1; i++) { if(resultSet.getString(i) != null && resultSet.getString(i).contains(",")){ String strToAppend = "\"" + resultSet.getString(i) + "\""; csvRowBuilder.append(strToAppend); } else { csvRowBuilder.append(resultSet.getString(i)); } csvRowBuilder.append(","); } int rowLength = csvRowBuilder.length(); csvRowBuilder.replace(rowLength - 1, rowLength, "\n"); pipedOutputStream.write(csvRowBuilder.toString().getBytes()); interactionsSent++; csvRowBuilder.setLength(0); if (interactionsSent % CHUNK_SIZE == 0) { pipedOutputStream.flush(); } } } finally { pipedOutputStream.flush(); pipedOutputStream.close(); } log.debug("Created all csv lines for Task {} - total of {} rows", employeeId, interactionsSent); return interactionsSent; } @SneakyThrows private void streamHeaders(ResultSetMetaData resultSetMetaData) { StringBuilder headersCsvBuilder = new StringBuilder(); for (int i = 1; i < resultSetMetaData.getColumnCount() + 1; i++) { headersCsvBuilder.append(resultSetMetaData.getColumnLabel(i)).append(","); } int rowLength = headersCsvBuilder.length(); headersCsvBuilder.replace(rowLength - 1, rowLength, "\n"); pipedOutputStream.write(headersCsvBuilder.toString().getBytes()); } }
为了测试这一点,您需要执行http://localhost:8080/stream-demo/employees/3/cars
1条答案
按热度按时间smdncfj31#
我发现了一个非常好的代码从数据库导出大量的行到几个csv文件和压缩它。我认为这是一个很好的代码,可以帮助很多开发人员。我已经测试了该解决方案,您可以在以下位置找到完整的示例:https://github.com/idaamit/stream-from-db/tree/master控制器为:
employee类,首先检查我们是否需要准备多个csv:
}
这个类StreamingZipResultSetExtractor负责将csv流数据拆分到几个文件中并对其进行压缩。
}
StreamingCsvResultSetExtractor类负责将结果集中的数据传输到csv文件中。还有更多的工作要做,以处理特殊的字符集,这是有问题的csv单元格。
为了测试这一点,您需要执行http://localhost:8080/stream-demo/employees/3/cars