如何从数据库中导出庞大的结果集到几个csv文件中,并在运行中压缩它们?

kupeojn6  于 2023-06-03  发布在  其他
关注(0)|答案(1)|浏览(451)

我需要创建一个REST控制器,它从数据库中提取数据并将其写入CSV文件,这些文件最终将被压缩在一起。每个CSV文件应该包含正好10行。最终,所有CSV文件都应该压缩到一个zip文件中。我希望所有事情都在运行中发生,这意味着将文件保存到磁盘上的临时位置不是一个选择。有人能给我举个例子吗?

smdncfj3

smdncfj31#

我发现了一个非常好的代码从数据库导出大量的行到几个csv文件和压缩它。我认为这是一个很好的代码,可以帮助很多开发人员。我已经测试了该解决方案,您可以在以下位置找到完整的示例:https://github.com/idaamit/stream-from-db/tree/master控制器为:

@GetMapping(value = "/employees/{employeeId}/cars") @ResponseStatus(HttpStatus.OK) public ResponseEntity<StreamingResponseBody> getEmployeeCars(@PathVariable  int employeeId) {
    log.info("Going to export cars for employee {}", employeeId);
    String zipFileName = "Cars Of Employee - " + employeeId;
    return ResponseEntity.ok()
            .header(HttpHeaders.CONTENT_TYPE, "application/zip")
            .header(HttpHeaders.CONTENT_DISPOSITION, "attachment;filename=" + zipFileName + ".zip")
            .body(
                    employee.getCars(dataSource, employeeId));

employee类,首先检查我们是否需要准备多个csv:

public class Employee {

public StreamingResponseBody getCars(BasicDataSource dataSource, int employeeId) {
    StreamingResponseBody streamingResponseBody = new StreamingResponseBody() {
        @Override
        public void writeTo(OutputStream outputStream) throws IOException {
            JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
            String sqlQuery = "SELECT [Id], [employeeId],  [type], [text1] " +
                    "FROM Cars " +
                    "WHERE EmployeeID=? ";
            PreparedStatementSetter preparedStatementSetter = new PreparedStatementSetter() {
                public void setValues(PreparedStatement preparedStatement) throws SQLException {
                    preparedStatement.setInt(1, employeeId);
                }
            };
            StreamingZipResultSetExtractor zipExtractor = new StreamingZipResultSetExtractor(outputStream, employeeId, isMoreThanOneFile(jdbcTemplate, employeeId));
            Integer numberOfInteractionsSent = jdbcTemplate.query(sqlQuery, preparedStatementSetter, zipExtractor);

        }
    };
    return streamingResponseBody;
}

private boolean isMoreThanOneFile(JdbcTemplate jdbcTemplate, int employeeId) {
    Integer numberOfCars = getCount(jdbcTemplate, employeeId);
    return numberOfCars >= StreamingZipResultSetExtractor.MAX_ROWS_IN_CSV;
}

private Integer getCount(JdbcTemplate jdbcTemplate, int employeeId) {

    String sqlQuery = "SELECT count([Id]) " +
            "FROM Cars " +
            "WHERE EmployeeID=? ";

    return jdbcTemplate.queryForObject(sqlQuery, new Object[] { employeeId }, Integer.class);

}

}
这个类StreamingZipResultSetExtractor负责将csv流数据拆分到几个文件中并对其进行压缩。

@Slf4j
public class StreamingZipResultSetExtractor implements ResultSetExtractor<Integer> {
    private final static int CHUNK_SIZE = 100000;
    public final static int MAX_ROWS_IN_CSV = 10;
    private OutputStream outputStream;
    private int employeeId;
    private StreamingCsvResultSetExtractor streamingCsvResultSetExtractor;
    private boolean isInteractionCountExceedsLimit;
    private int fileCount = 0;
public StreamingZipResultSetExtractor(OutputStream outputStream, int employeeId, boolean isInteractionCountExceedsLimit) {
    this.outputStream = outputStream;
    this.employeeId = employeeId;
    this.streamingCsvResultSetExtractor = new StreamingCsvResultSetExtractor(employeeId);
    this.isInteractionCountExceedsLimit = isInteractionCountExceedsLimit;
}

@Override
@SneakyThrows
public Integer extractData(ResultSet resultSet) throws DataAccessException {
    log.info("Creating thread to extract data as zip file for employeeId {}", employeeId);
    int lineCount = 1; //+1 for header row
    try (PipedOutputStream internalOutputStream = streamingCsvResultSetExtractor.extractData(resultSet);
         PipedInputStream InputStream = new PipedInputStream(internalOutputStream);
         BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(InputStream))) {

        String currentLine;
        String header = bufferedReader.readLine() + "\n";
        try (ZipOutputStream zipOutputStream = new ZipOutputStream(outputStream)) {
            createFile(employeeId, zipOutputStream, header);
            while ((currentLine = bufferedReader.readLine()) != null) {
                if (lineCount % MAX_ROWS_IN_CSV == 0) {
                    zipOutputStream.closeEntry();
                    createFile(employeeId, zipOutputStream, header);
                    lineCount++;
                }
                lineCount++;
                currentLine += "\n";
                zipOutputStream.write(currentLine.getBytes());
                if (lineCount % CHUNK_SIZE == 0) {
                    zipOutputStream.flush();
                }
            }
        }
    } catch (IOException e) {
        log.error("Task {} could not zip search results", employeeId, e);
    }

    log.info("Finished zipping all lines to {} file\\s - total of {} lines of data for task {}", fileCount, lineCount - fileCount, employeeId);
    return lineCount;
}

private void createFile(int employeeId, ZipOutputStream zipOutputStream, String header) {
    String fileName = "Cars for Employee - " + employeeId;
    if (isInteractionCountExceedsLimit) {
        fileCount++;
        fileName += " Part " + fileCount;
    }
    try {
        zipOutputStream.putNextEntry(new ZipEntry(fileName + ".csv"));
        zipOutputStream.write(header.getBytes());
    } catch (IOException e) {
        log.error("Could not create new zip entry for task {} ", employeeId, e);
    }
}

}
StreamingCsvResultSetExtractor类负责将结果集中的数据传输到csv文件中。还有更多的工作要做,以处理特殊的字符集,这是有问题的csv单元格。

@Slf4j
public class StreamingCsvResultSetExtractor implements ResultSetExtractor<PipedOutputStream> {
    private final static int CHUNK_SIZE = 100000;
    private PipedOutputStream pipedOutputStream;
    private final int employeeId;
    public StreamingCsvResultSetExtractor(int employeeId) {
        this.employeeId = employeeId;
    }
    @SneakyThrows
    @Override
    public PipedOutputStream extractData(ResultSet resultSet) throws DataAccessException {
        log.info("Creating thread to extract data as csv and save to file for task {}", employeeId);
        this.pipedOutputStream = new PipedOutputStream();
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.submit(() -> {
            prepareCsv(resultSet);
        });

        return pipedOutputStream;
    }

    @SneakyThrows
    private Integer prepareCsv(ResultSet resultSet) {
        int interactionsSent = 1;
        log.info("starting to extract data to csv lines");
        streamHeaders(resultSet.getMetaData());
        StringBuilder csvRowBuilder = new StringBuilder();
        try {
            int columnCount = resultSet.getMetaData().getColumnCount();
            while (resultSet.next()) {
                for (int i = 1; i < columnCount + 1; i++) {
                    if(resultSet.getString(i) != null && resultSet.getString(i).contains(",")){
                            String strToAppend = "\"" + resultSet.getString(i) + "\"";
                            csvRowBuilder.append(strToAppend);
                        } else {
                            csvRowBuilder.append(resultSet.getString(i));
                    }
                    csvRowBuilder.append(",");
                }
                int rowLength = csvRowBuilder.length();
                csvRowBuilder.replace(rowLength - 1, rowLength, "\n");

                pipedOutputStream.write(csvRowBuilder.toString().getBytes());
                interactionsSent++;
                csvRowBuilder.setLength(0);
                if (interactionsSent % CHUNK_SIZE == 0) {
                    pipedOutputStream.flush();
                }
            }
        } finally {
            pipedOutputStream.flush();
            pipedOutputStream.close();
        }

        log.debug("Created all csv lines for Task {} - total of {} rows", employeeId, interactionsSent);
        return interactionsSent;
    }

    @SneakyThrows
    private void streamHeaders(ResultSetMetaData resultSetMetaData) {
        StringBuilder headersCsvBuilder = new StringBuilder();

        for (int i = 1; i < resultSetMetaData.getColumnCount() + 1; i++) {
            headersCsvBuilder.append(resultSetMetaData.getColumnLabel(i)).append(",");
        }
        int rowLength = headersCsvBuilder.length();
        headersCsvBuilder.replace(rowLength - 1, rowLength, "\n");

        pipedOutputStream.write(headersCsvBuilder.toString().getBytes());
    }

}

为了测试这一点,您需要执行http://localhost:8080/stream-demo/employees/3/cars

相关问题