java.util.stream 结果集的示例网站

vxqlmq5t  于 2023-01-01  发布在  Java
关注(0)|答案(7)|浏览(112)

我有几个数据量很大的表(大约1亿条记录)。所以我不能将这些数据存储在内存中,但是我想使用java.util.stream类流传输这个结果集,并将这个流传递给另一个类。我读过关于Stream.ofStream.Builder运算符的文章,但是它们是内存中的缓冲流。那么有什么方法可以解决这个问题吗?

更新#1

好的,我在谷歌上找到了jooq库。我不确定,但看起来它可能适用于我的测试用例。总结一下,我有几个表有大量的数据。我想流我的结果集,并将此流转移到另一个方法。类似这样:

// why return Stream<String>? Because my result set has String type
private Stream<Record> writeTableToStream(DataSource dataSource, String table) {

    Stream<Record> record = null;
    try (Connection connection = dataSource.getConnection()) {
        String sql = "select * from " + table;

        try (PreparedStatement pSt = connection.prepareStatement(sql)) {
            connection.setAutoCommit(false);
            pSt.setFetchSize(5000);
            ResultSet resultSet = pSt.executeQuery();
            //
            record = DSL.using(connection)
                    .fetch(resultSet).stream();
        }
    } catch (SQLException sqlEx) {
        logger.error(sqlEx);
    }
    
    return record;
}

有人能告诉我,我走的路对吗?谢谢。

更新#2

我在jooq上做了一些实验,现在可以说上面的决定不适合我。这个代码record = DSL.using(connection).fetch(resultSet).stream();花费太多时间

jvlzgdj9

jvlzgdj91#

你首先要理解的是代码

try (Connection connection = dataSource.getConnection()) {
    …
    try (PreparedStatement pSt = connection.prepareStatement(sql)) {
        …
        return stream;
    }
}

不起作用,因为当您离开try块时,资源已关闭,而Stream的处理甚至还没有开始。
资源管理构造“try with resources”适用于在方法内的块范围内使用的资源,但您正在创建一个返回资源的工厂方法。因此,您必须确保关闭返回流将关闭资源,并且调用方负责关闭Stream
此外,您还需要一个函数,它可以从ResultSet的一行中生成一个项。

Record createRecord(ResultSet rs) {
    …
}

你可以创建一个Stream<Record>,基本上就像

Stream<Record> stream = StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
    Long.MAX_VALUE,Spliterator.ORDERED) {
        @Override
        public boolean tryAdvance(Consumer<? super Record> action) {
            if(!resultSet.next()) return false;
            action.accept(createRecord(resultSet));
            return true;
        }
    }, false);

但是要正确地执行它,您必须合并异常处理和资源关闭。您可以使用Stream.onClose来注册一个操作,该操作将在Stream关闭时执行。但它必须是不能抛出检查异常的Runnable。同样,tryAdvance方法也不允许抛出检查异常。由于我们不能'不要在这里简单地嵌套try(…)块,当已经有一个挂起的异常时,在close中抛出的抑制异常的程序逻辑不是免费的。
为了帮助我们,我们引入了一个新的类型,它可以 Package 可能抛出检查异常的关闭操作,并将它们 Package 在一个未检查异常中。通过实现AutoCloseable本身,它可以利用try(…)构造安全地链接关闭操作:

interface UncheckedCloseable extends Runnable, AutoCloseable {
    default void run() {
        try { close(); } catch(Exception ex) { throw new RuntimeException(ex); }
    }
    static UncheckedCloseable wrap(AutoCloseable c) {
        return c::close;
    }
    default UncheckedCloseable nest(AutoCloseable c) {
        return ()->{ try(UncheckedCloseable c1=this) { c.close(); } };
    }
}

这样,整个操作变为:

private Stream<Record> tableAsStream(DataSource dataSource, String table)
    throws SQLException {

    UncheckedCloseable close=null;
    try {
        Connection connection = dataSource.getConnection();
        close=UncheckedCloseable.wrap(connection);
        String sql = "select * from " + table;
        PreparedStatement pSt = connection.prepareStatement(sql);
        close=close.nest(pSt);
        connection.setAutoCommit(false);
        pSt.setFetchSize(5000);
        ResultSet resultSet = pSt.executeQuery();
        close=close.nest(resultSet);
        return StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
            Long.MAX_VALUE,Spliterator.ORDERED) {
            @Override
            public boolean tryAdvance(Consumer<? super Record> action) {
                try {
                    if(!resultSet.next()) return false;
                    action.accept(createRecord(resultSet));
                    return true;
                } catch(SQLException ex) {
                    throw new RuntimeException(ex);
                }
            }
        }, false).onClose(close);
    } catch(SQLException sqlEx) {
        if(close!=null)
            try { close.close(); } catch(Exception ex) { sqlEx.addSuppressed(ex); }
        throw sqlEx;
    }
}

此方法将所有资源ConnectionStatementResultSet的必要关闭操作 Package 在上述实用程序类的一个示例中。如果在初始化期间发生异常,则立即执行关闭操作并将异常传递给调用方。如果流构造成功,则通过onClose注册关闭操作。
因此,调用者必须确保正确关闭,如

try(Stream<Record> s=tableAsStream(dataSource, table)) {
    // stream operation
}

请注意,通过RuntimeException传递SQLException也已添加到tryAdvance方法中。因此,现在可以将throws SQLException添加到createRecord方法中而不会出现问题。

v8wbuo2f

v8wbuo2f2#

jOQ

我将回答你问题中的jOOQ部分,从jOOQ3.8开始,已经有了很多与jOOQ和Stream.Other usages are also documented on this jOOQ page结合相关的附加特性。

您的建议用法:

你试过了:

Stream<Record> stream = DSL.using(connection).fetch(resultSet).stream();

实际上,这对于大型结果集来说效果不好,因为fetch(ResultSet)将整个结果集提取到内存中,然后在其上调用Collection.stream()

更好的(惰性)用法:

相反,您可以这样写:

try (Stream<Record> stream = DSL.using(connection).fetchStream(resultSet)) {
    ...
}

......这基本上是为了方便:

try (Cursor<Record> cursor = DSL.using(connection).fetchLazy(resultSet)) {
    Stream<Record> stream = cursor.stream();
    ...
}

另请参见DSLContext.fetchStream(ResultSet)
当然,您也可以让jOOQ执行您的SQL字符串,而不是纠结于JDBC:

try (Stream<Record> stream = 
     DSL.using(dataSource)
        .resultQuery("select * from {0}", DSL.name(table)) // Prevent SQL injection
        .fetchSize(5000)
        .fetchStream()) {
    ...
}

可怕的SELECT *
正如评论中批评的那样,他们的jOOQ使用似乎很慢,因为jOOQ急切地将LOB数据提取到内存中,* 尽管 * 使用了fetchLazy()。单词"lazy"对应于懒惰地提取记录(一个接一个),而不是懒惰地提取列数据。一个记录一次提取完成,假设您实际上 * 想要 * 投影整行。
如果您不需要一些较重的行,就不要投影它们! SELECT *在SQL中几乎总是一个坏主意。

  • 它会在数据库服务器、网络和客户机中产生更多的I/O和内存开销。
  • 它防止覆盖索引使用
  • 它可防止连接消除转换

More info in this blog post here.

尝试使用资源时

请注意,jOOQ生成的Stream是"资源丰富的",也就是说,它包含了一个对已打开的ResultSet(和PreparedStatement)的引用。因此,如果您真的想在方法之外返回该流,请确保它已正确关闭!

1szpjjfi

1szpjjfi3#

我不知道有哪个著名的图书馆会为你做这件事。
也就是说,this article展示了如何用一个Iterator(ResultSetIterator) Package 结果集,并将其作为第一个参数传递给Spliterators.spliteratorUnknownSize(),以创建一个Spliterator
然后StreamSupport可以使用Spliterator在其上创建Stream。
他们建议的ResultSetIterator类实现:

public class ResultSetIterator implements Iterator {

    private ResultSet rs;
    private PreparedStatement ps;
    private Connection connection;
    private String sql;

    public ResultSetIterator(Connection connection, String sql) {
        assert connection != null;
        assert sql != null;
        this.connection = connection;
        this.sql = sql;
    }

    public void init() {
        try {
            ps = connection.prepareStatement(sql);
            rs = ps.executeQuery();

        } catch (SQLException e) {
            close();
            throw new DataAccessException(e);
        }
    }

    @Override
    public boolean hasNext() {
        if (ps == null) {
            init();
        }
        try {
            boolean hasMore = rs.next();
            if (!hasMore) {
                close();
            }
            return hasMore;
        } catch (SQLException e) {
            close();
            throw new DataAccessException(e);
        }

    }

    private void close() {
        try {
            rs.close();
            try {
                ps.close();
            } catch (SQLException e) {
                //nothing we can do here
            }
        } catch (SQLException e) {
            //nothing we can do here
        }
    }

    @Override
    public Tuple next() {
        try {
            return SQL.rowAsTuple(sql, rs);
        } catch (DataAccessException e) {
            close();
            throw e;
        }
    }
}

然后:

public static Stream stream(final Connection connection, 
                                       final String sql, 
                                       final Object... parms) {
  return StreamSupport
                .stream(Spliterators.spliteratorUnknownSize(
                        new ResultSetIterator(connection, sql), 0), false);
}
8yoxcaq7

8yoxcaq74#

下面是abacus-jdbc的最简单示例。

final DataSource ds = JdbcUtil.createDataSource(url, user, password);
final SQLExecutor sqlExecutor = new SQLExecutor(ds);
sqlExecutor.stream(sql, parameters).filter(...).map(...).collect(...) // lazy execution&loading and auto-close Statement/Connection

或者:

JdbcUtil.prepareQuery(ds, sql)
            .stream(ResultRecord.class) // or RowMapper.MAP/...
            .filter(...).map(...).collect(...)  // lazy execution&loading and auto-close Statement/Connection

这完全是延迟加载和自动关闭。记录将通过fetch size(如果未指定,则为默认值)从数据库加载,并且语句和连接将在收集结果/记录后自动关闭。
披露:我是AbacusUtil的开发人员。

kx1ctssn

kx1ctssn5#

使用我的库,它会这样做:
附加Maven依赖项:

<dependency>
    <groupId>com.github.buckelieg</groupId>
    <artifactId>db-fn</artifactId>
    <version>0.3.4</version>
</dependency>

在代码中使用库:

Function<Stream<I>, O> processor = stream -> //process input stream
try (DB db = new DB("jdbc:postgresql://host:port/database?user=user&password=pass")) {
    processor.apply(
        db.select("SELECT * FROM my_table t1 JOIN my_table t2 ON t1.id = t2.id")
          .fetchSize(5000)
          .execute(rs -> /*ResultSet mapper*/)
    );
}

查看更多here

qyzbxkaa

qyzbxkaa6#

Ujorm框架中的某个名为Tools的公共模块使用RowIterator类提供了一个简单的解决方案用途:

PreparedStatement ps = dbConnection.prepareStatement("SELECT * FROM myTable");
    new RowIterator(ps).toStream().forEach((RsConsumer)(resultSet) -> {
        int value = resultSet.getInt(1);
    });

Maven对工具库的依赖性(50KB):

<dependency>
        <groupId>org.ujorm</groupId>
        <artifactId>ujo-tools</artifactId>
        <version>1.93</version>
    </dependency>

有关详细信息,请参见jUnit测试。

i34xakig

i34xakig7#

我只是做了总结,以提供有关如何在不使用3rd click here for detail的情况下流式传输ResultSet和执行简单SQL查询的真实的示例
Blockquote:Java 8提供了Stream家族,并且易于操作。管道的使用方式使得代码清晰智能。然而,ResultSet仍然是非常传统的处理方式。根据实际的ResultSet使用情况,如果转换为Stream,将非常有帮助。
.... StreamUtils.uncheckedConsumer需要将SQLException转换为runtimeException以使Lamda清晰。

相关问题