我有几个数据量很大的表(大约1亿条记录)。所以我不能将这些数据存储在内存中,但是我想使用java.util.stream
类流传输这个结果集,并将这个流传递给另一个类。我读过关于Stream.of
和Stream.Builder
运算符的文章,但是它们是内存中的缓冲流。那么有什么方法可以解决这个问题吗?
更新#1
好的,我在谷歌上找到了jooq库。我不确定,但看起来它可能适用于我的测试用例。总结一下,我有几个表有大量的数据。我想流我的结果集,并将此流转移到另一个方法。类似这样:
// why return Stream<String>? Because my result set has String type
private Stream<Record> writeTableToStream(DataSource dataSource, String table) {
Stream<Record> record = null;
try (Connection connection = dataSource.getConnection()) {
String sql = "select * from " + table;
try (PreparedStatement pSt = connection.prepareStatement(sql)) {
connection.setAutoCommit(false);
pSt.setFetchSize(5000);
ResultSet resultSet = pSt.executeQuery();
//
record = DSL.using(connection)
.fetch(resultSet).stream();
}
} catch (SQLException sqlEx) {
logger.error(sqlEx);
}
return record;
}
有人能告诉我,我走的路对吗?谢谢。
更新#2
我在jooq上做了一些实验,现在可以说上面的决定不适合我。这个代码record = DSL.using(connection).fetch(resultSet).stream();
花费太多时间
7条答案
按热度按时间jvlzgdj91#
你首先要理解的是代码
不起作用,因为当您离开
try
块时,资源已关闭,而Stream
的处理甚至还没有开始。资源管理构造“try with resources”适用于在方法内的块范围内使用的资源,但您正在创建一个返回资源的工厂方法。因此,您必须确保关闭返回流将关闭资源,并且调用方负责关闭
Stream
。此外,您还需要一个函数,它可以从
ResultSet
的一行中生成一个项。你可以创建一个
Stream<Record>
,基本上就像但是要正确地执行它,您必须合并异常处理和资源关闭。您可以使用
Stream.onClose
来注册一个操作,该操作将在Stream
关闭时执行。但它必须是不能抛出检查异常的Runnable
。同样,tryAdvance
方法也不允许抛出检查异常。由于我们不能'不要在这里简单地嵌套try(…)
块,当已经有一个挂起的异常时,在close
中抛出的抑制异常的程序逻辑不是免费的。为了帮助我们,我们引入了一个新的类型,它可以 Package 可能抛出检查异常的关闭操作,并将它们 Package 在一个未检查异常中。通过实现
AutoCloseable
本身,它可以利用try(…)
构造安全地链接关闭操作:这样,整个操作变为:
此方法将所有资源
Connection
、Statement
和ResultSet
的必要关闭操作 Package 在上述实用程序类的一个示例中。如果在初始化期间发生异常,则立即执行关闭操作并将异常传递给调用方。如果流构造成功,则通过onClose
注册关闭操作。因此,调用者必须确保正确关闭,如
请注意,通过
RuntimeException
传递SQLException
也已添加到tryAdvance
方法中。因此,现在可以将throws SQLException
添加到createRecord
方法中而不会出现问题。v8wbuo2f2#
jOQ
我将回答你问题中的jOOQ部分,从jOOQ3.8开始,已经有了很多与jOOQ和Stream.Other usages are also documented on this jOOQ page结合相关的附加特性。
您的建议用法:
你试过了:
实际上,这对于大型结果集来说效果不好,因为
fetch(ResultSet)
将整个结果集提取到内存中,然后在其上调用Collection.stream()
。更好的(惰性)用法:
相反,您可以这样写:
......这基本上是为了方便:
另请参见
DSLContext.fetchStream(ResultSet)
当然,您也可以让jOOQ执行您的SQL字符串,而不是纠结于JDBC:
可怕的
SELECT *
正如评论中批评的那样,他们的jOOQ使用似乎很慢,因为jOOQ急切地将LOB数据提取到内存中,* 尽管 * 使用了
fetchLazy()
。单词"lazy"对应于懒惰地提取记录(一个接一个),而不是懒惰地提取列数据。一个记录一次提取完成,假设您实际上 * 想要 * 投影整行。如果您不需要一些较重的行,就不要投影它们!
SELECT *
在SQL中几乎总是一个坏主意。More info in this blog post here.
尝试使用资源时
请注意,jOOQ生成的
Stream
是"资源丰富的",也就是说,它包含了一个对已打开的ResultSet
(和PreparedStatement
)的引用。因此,如果您真的想在方法之外返回该流,请确保它已正确关闭!1szpjjfi3#
我不知道有哪个著名的图书馆会为你做这件事。
也就是说,this article展示了如何用一个Iterator(ResultSetIterator) Package 结果集,并将其作为第一个参数传递给
Spliterators.spliteratorUnknownSize()
,以创建一个Spliterator
。然后
StreamSupport
可以使用Spliterator在其上创建Stream。他们建议的
ResultSetIterator
类实现:然后:
8yoxcaq74#
下面是abacus-jdbc的最简单示例。
或者:
这完全是延迟加载和自动关闭。记录将通过
fetch size
(如果未指定,则为默认值)从数据库加载,并且语句和连接将在收集结果/记录后自动关闭。披露:我是AbacusUtil的开发人员。
kx1ctssn5#
使用我的库,它会这样做:
附加Maven依赖项:
在代码中使用库:
查看更多here
qyzbxkaa6#
Ujorm框架中的某个名为Tools的公共模块使用
RowIterator
类提供了一个简单的解决方案用途:Maven对工具库的依赖性(50KB):
有关详细信息,请参见jUnit测试。
i34xakig7#
我只是做了总结,以提供有关如何在不使用3rd click here for detail的情况下流式传输ResultSet和执行简单SQL查询的真实的示例
Blockquote:Java 8提供了Stream家族,并且易于操作。管道的使用方式使得代码清晰智能。然而,ResultSet仍然是非常传统的处理方式。根据实际的ResultSet使用情况,如果转换为Stream,将非常有帮助。
.... StreamUtils.uncheckedConsumer需要将SQLException转换为runtimeException以使Lamda清晰。