得到一个 Spliterator
从 Stream
管道可能返回streamspliterators.wrappingspliterator的示例。例如,获取以下内容 Spliterator
:
Spliterator<String> source = new Random()
.ints(11, 0, 7) // size, origin, bound
.filter(nr -> nr % 2 != 0)
.mapToObj(Integer::toString)
.spliterator();
鉴于上述情况 Spliterator<String> source
,当我们通过 tryAdvance (Consumer<? super P_OUT> consumer)
方法 Spliterator
,在本例中是streamspliterators.wrappingspliterator的一个示例,它将首先将项累积到内部缓冲区中,然后再使用这些项,正如我们在streamspliterators.java#298中看到的那样。从简单的Angular 来看 doAdvance()
先将项目插入到 buffer
然后它得到下一个项目并将其传递给 consumer.accept (…)
.
public boolean tryAdvance(Consumer<? super P_OUT> consumer) {
boolean hasNext = doAdvance();
if (hasNext)
consumer.accept(buffer.get(nextToConsume));
return hasNext;
}
然而,我不明白这个的必要性 buffer
.
在这种情况下,为什么 consumer
的参数 tryAdvance
不是简单地用作终端 Sink
管道的压力?
4条答案
按热度按时间bq3bfh9z1#
Spliterators
设计用于按相遇顺序处理每个项目的顺序处理,以及按某种顺序处理项目的并行处理。每种方法Spliterator
必须能够支持早期绑定和后期绑定。缓冲旨在将数据收集到合适的、可处理的块中,这些块遵循排序、并行化和可变性的要求。换句话说,
tryAdvance()
不是类中唯一的方法,其他方法必须相互协作才能传递外部契约。要做到这一点,面对可能重写某些或所有方法的子类,需要每个方法遵守其内部约定。czq61nw12#
这是我从霍尔格那里读到的一些文章,我在这里总结一下;如果有一个确切的重复(我会努力找到一个)-我会关闭并删除我的答案,关于这一个。
首先,为什么
WrappingSpliterator
对于像这样的有状态操作,首先需要sorted
,distinct
等等,但我想你已经明白了。我认为flatMap
而且-因为它是渴望的。现在,当你打电话的时候
spliterator
,如果没有有状态的操作,就没有真正的理由将其 Package 到WrappingSpliterator
显然,但目前这还没有完成。这可能会在未来的版本中改变-在那里他们可以检测到是否有stateful operations
在你打电话之前spliterator
; 但他们现在不这么做,只是将每个操作都视为有状态的,从而将其 Package 到WrappingSpliterator
ebdffaop3#
请记住,这是
Spliterator
由public
方法Stream.spliterator()
,因此不能对调用者进行任何假设(只要它在合同中)。这个
tryAdvance
方法可能会为流的每个元素调用一次,并再次检测流的结尾,实际上,它可能会被调用任意次数,甚至在到达结尾之后。而且也不能保证呼叫者总是经过同一个消费者。要将使用者直接传递给源拆分器而无需缓冲,您必须组成一个将执行所有管道阶段的使用者,即调用Map函数并使用其结果或测试 predicate ,如果为负,则不调用下游使用者,依此类推。传递给源拆分器的使用者还将负责通知
WrappingSpliterator
一个值作为源拆分器的值被过滤器拒绝tryAdvance
方法仍返回true
在这种情况下,操作将不得不重复。正如eugene正确地提到的,这是一个一刀切的实现,不考虑有多少或什么样的管道阶段。组成这样一个消费者的成本可能很高,而且可能需要为每一个消费者重新申请
tryAdvance
调用,读取每个流元素,例如,当不同的消费者被传递到tryAdvance
或者当平等检查不起作用时。请记住,使用者通常实现为lambda表达式,并且lambda表达式生成的示例的标识或相等性未指定。所以
tryAdvance
实现通过在第一次调用时只组成一个使用者示例来避免这些开销,如果没有被过滤器拒绝,该示例将始终将元素存储到同一个缓冲区中,也在第一次调用时分配。请注意,在正常情况下,缓冲区将只保存一个元素。阿法伊克,flatMap
是唯一可以将更多元素推送到缓冲区的操作。但要注意的是,这种非懒惰行为的存在flatMap
这也是为什么需要这种缓冲策略的原因,至少在flatMap
以确保Spliterator
执行由public
方法将实现在一次调用期间最多向使用者传递一个元素的约定tryAdvance
.相反,当你打电话的时候
forEachRemaining
,这些问题并不存在。只有一个Consumer
整个操作过程中的示例以及flatMap
也没关系,因为所有元素都会被消耗掉。因此,将尝试非缓冲传输,只要之前没有tryAdvance
调用可能导致某些元素缓冲:如你所见,只要
buffer
尚未初始化,即没有以前的tryAdvance
打了个电话,consumer::accept
绑定为Sink
完全直接转让。p3rjfoxz4#
我基本上同意霍尔格的回答,但我会用不同的口音。我认为您很难理解对缓冲区的需求,因为您对streamapi所允许的内容有非常简单的心理模型。如果一个人认为流是一系列的
map
以及filter
,不需要额外的缓冲区,因为这些操作有两个重要的“良好”属性:一次只处理一个元素
结果产生0或1个元素
但一般情况下并非如此。正如@holger(我在最初的回答中提到的)所说的,已经有了
flatMap
在Java8中,它打破了规则2,在Java9中,它们最终添加了takewhile,而这实际上是整体上的转换Stream
->Stream
而不是在每个元素的基础上(这是第一个中间衬衫电路操作)。另一点我不太同意@holger的观点是,我认为最根本的原因与他在第二段(即a)中提出的原因有点不同
tryAdvance
在会议结束后Stream
多次和b)“不能保证呼叫者总是经过同一个消费者”)。我认为最重要的原因是Spliterator
在功能上等同于Stream
必须支持短路和惰性(即不处理整个过程的能力Stream
或者它不能支持未绑定的流)。换句话说,即使spliteratorapi(非常奇怪)要求您必须使用相同的Consumer
对象的所有方法的所有调用Spliterator
,你仍然需要tryAdvance
然后呢tryAdvance
实现仍然需要使用一些缓冲区。你不能停止处理数据forEachRemaining(Consumer<? super T> )
所以你不能实现类似于findFirst
或者takeWhile
使用它。实际上,这也是jdk实现内部使用Sink
接口而不是Consumer
(和什么“ Package ”wrapAndCopyInto
代表):Sink
有额外的boolean cancellationRequested()
方法。总而言之:缓冲区是必需的,因为我们想要
Spliterator
:使用简单
Consumer
无法报告处理/取消的后端提供通过(逻辑)使用者的请求停止数据处理的方法。
请注意,这两个要求实际上有点矛盾。
示例和一些代码
在这里,我想提供一些代码的例子,我认为在给定当前api契约(接口)的情况下,没有额外的缓冲区是不可能实现的。这个例子是以你的例子为基础的。
有一个简单的collatz整数序列,可以推测它最终总是到达1。afaik这个猜想还没有被证明,但是对于许多整数(至少对于整个32位整数范围)已经得到了验证。
所以假设我们要解决的问题是:从1到1000000范围内的collatz序列流中,找到第一个十进制表示中包含“123”的序列。
这里有一个解决方案,只使用
Stream
(不是Spliterator
):哪里
collatzSequence
是一个返回Stream
包含collatz序列直到第一个1(对于挑剔者,当当前值大于Integer.MAX_VALUE /3
所以我们不打溢出)。诸如此类
Stream
退回人collatzSequence
是绑定的。也是标准的Random
最终将生成所提供范围内的每个数字。这意味着我们可以保证流中最终会有一些“好”的数字(例如123
)以及findFirst
短路,所以整个操作实际上会终止。然而,没有一个合理的流api实现可以预测这一点。现在让我们假设出于某种奇怪的原因,您希望使用中间层执行相同的操作
Spliterator
. 即使你只有一个逻辑,不需要不同的逻辑Consumer
s、 你不能使用forEachRemaining
. 所以你必须这样做:同样重要的是,对于一些起始数字,collatz序列将包含几个匹配的数字。例如,两者
41123
以及123370
(=41123*3+1)包含“123”。这意味着我们真的不想Consumer
被称为后匹配的第一个命中。但是自从Consumer
不会暴露任何方法来报告处理结束,WrappingSpliterator
不能就这样通过我们的考试Consumer
到里面去Spliterator
. 唯一的解决办法是积累所有的内部结果flatMap
(所有后处理)放入某个缓冲区,然后一次遍历该缓冲区中的一个元素。