为什么stream.spliterator()的tryadvance会将项累积到缓冲区中?

kxeu7u2r  于 2021-07-09  发布在  Java
关注(0)|答案(4)|浏览(404)

得到一个 SpliteratorStream 管道可能返回streamspliterators.wrappingspliterator的示例。例如,获取以下内容 Spliterator :

Spliterator<String> source = new Random()
            .ints(11, 0, 7) // size, origin, bound
            .filter(nr -> nr % 2 != 0)
            .mapToObj(Integer::toString)
            .spliterator();

鉴于上述情况 Spliterator<String> source ,当我们通过 tryAdvance (Consumer<? super P_OUT> consumer) 方法 Spliterator ,在本例中是streamspliterators.wrappingspliterator的一个示例,它将首先将项累积到内部缓冲区中,然后再使用这些项,正如我们在streamspliterators.java#298中看到的那样。从简单的Angular 来看 doAdvance() 先将项目插入到 buffer 然后它得到下一个项目并将其传递给 consumer.accept (…) .

public boolean tryAdvance(Consumer<? super P_OUT> consumer) {
    boolean hasNext = doAdvance();
    if (hasNext)
        consumer.accept(buffer.get(nextToConsume));
    return hasNext;
}

然而,我不明白这个的必要性 buffer .
在这种情况下,为什么 consumer 的参数 tryAdvance 不是简单地用作终端 Sink 管道的压力?

bq3bfh9z

bq3bfh9z1#

Spliterators 设计用于按相遇顺序处理每个项目的顺序处理,以及按某种顺序处理项目的并行处理。每种方法 Spliterator 必须能够支持早期绑定和后期绑定。缓冲旨在将数据收集到合适的、可处理的块中,这些块遵循排序、并行化和可变性的要求。
换句话说, tryAdvance() 不是类中唯一的方法,其他方法必须相互协作才能传递外部契约。要做到这一点,面对可能重写某些或所有方法的子类,需要每个方法遵守其内部约定。

czq61nw1

czq61nw12#

这是我从霍尔格那里读到的一些文章,我在这里总结一下;如果有一个确切的重复(我会努力找到一个)-我会关闭并删除我的答案,关于这一个。
首先,为什么 WrappingSpliterator 对于像这样的有状态操作,首先需要 sorted , distinct 等等,但我想你已经明白了。我认为 flatMap 而且-因为它是渴望的。
现在,当你打电话的时候 spliterator ,如果没有有状态的操作,就没有真正的理由将其 Package 到 WrappingSpliterator 显然,但目前这还没有完成。这可能会在未来的版本中改变-在那里他们可以检测到是否有 stateful operations 在你打电话之前 spliterator ; 但他们现在不这么做,只是将每个操作都视为有状态的,从而将其 Package 到 WrappingSpliterator

ebdffaop

ebdffaop3#

请记住,这是 Spliteratorpublic 方法 Stream.spliterator() ,因此不能对调用者进行任何假设(只要它在合同中)。
这个 tryAdvance 方法可能会为流的每个元素调用一次,并再次检测流的结尾,实际上,它可能会被调用任意次数,甚至在到达结尾之后。而且也不能保证呼叫者总是经过同一个消费者。
要将使用者直接传递给源拆分器而无需缓冲,您必须组成一个将执行所有管道阶段的使用者,即调用Map函数并使用其结果或测试 predicate ,如果为负,则不调用下游使用者,依此类推。传递给源拆分器的使用者还将负责通知 WrappingSpliterator 一个值作为源拆分器的值被过滤器拒绝 tryAdvance 方法仍返回 true 在这种情况下,操作将不得不重复。
正如eugene正确地提到的,这是一个一刀切的实现,不考虑有多少或什么样的管道阶段。组成这样一个消费者的成本可能很高,而且可能需要为每一个消费者重新申请 tryAdvance 调用,读取每个流元素,例如,当不同的消费者被传递到 tryAdvance 或者当平等检查不起作用时。请记住,使用者通常实现为lambda表达式,并且lambda表达式生成的示例的标识或相等性未指定。
所以 tryAdvance 实现通过在第一次调用时只组成一个使用者示例来避免这些开销,如果没有被过滤器拒绝,该示例将始终将元素存储到同一个缓冲区中,也在第一次调用时分配。请注意,在正常情况下,缓冲区将只保存一个元素。阿法伊克, flatMap 是唯一可以将更多元素推送到缓冲区的操作。但要注意的是,这种非懒惰行为的存在 flatMap 这也是为什么需要这种缓冲策略的原因,至少在 flatMap 以确保 Spliterator 执行由 public 方法将实现在一次调用期间最多向使用者传递一个元素的约定 tryAdvance .
相反,当你打电话的时候 forEachRemaining ,这些问题并不存在。只有一个 Consumer 整个操作过程中的示例以及 flatMap 也没关系,因为所有元素都会被消耗掉。因此,将尝试非缓冲传输,只要之前没有 tryAdvance 调用可能导致某些元素缓冲:

public void forEachRemaining(Consumer<? super P_OUT> consumer) {
         if (buffer == null && !finished) {
             Objects.requireNonNull(consumer);
             init();

             ph.wrapAndCopyInto((Sink<P_OUT>) consumer::accept, spliterator);
             finished = true;
         }
         else {
             do { } while (tryAdvance(consumer));
         }
     }

如你所见,只要 buffer 尚未初始化,即没有以前的 tryAdvance 打了个电话, consumer::accept 绑定为 Sink 完全直接转让。

p3rjfoxz

p3rjfoxz4#

我基本上同意霍尔格的回答,但我会用不同的口音。我认为您很难理解对缓冲区的需求,因为您对streamapi所允许的内容有非常简单的心理模型。如果一个人认为流是一系列的 map 以及 filter ,不需要额外的缓冲区,因为这些操作有两个重要的“良好”属性:
一次只处理一个元素
结果产生0或1个元素
但一般情况下并非如此。正如@holger(我在最初的回答中提到的)所说的,已经有了 flatMap 在Java8中,它打破了规则2,在Java9中,它们最终添加了takewhile,而这实际上是整体上的转换 Stream -> Stream 而不是在每个元素的基础上(这是第一个中间衬衫电路操作)。
另一点我不太同意@holger的观点是,我认为最根本的原因与他在第二段(即a)中提出的原因有点不同 tryAdvance 在会议结束后 Stream 多次和b)“不能保证呼叫者总是经过同一个消费者”)。我认为最重要的原因是 Spliterator 在功能上等同于 Stream 必须支持短路和惰性(即不处理整个过程的能力 Stream 或者它不能支持未绑定的流)。换句话说,即使spliteratorapi(非常奇怪)要求您必须使用相同的 Consumer 对象的所有方法的所有调用 Spliterator ,你仍然需要 tryAdvance 然后呢 tryAdvance 实现仍然需要使用一些缓冲区。你不能停止处理数据 forEachRemaining(Consumer<? super T> ) 所以你不能实现类似于 findFirst 或者 takeWhile 使用它。实际上,这也是jdk实现内部使用 Sink 接口而不是 Consumer (和什么“ Package ” wrapAndCopyInto 代表): Sink 有额外的 boolean cancellationRequested() 方法。
总而言之:缓冲区是必需的,因为我们想要 Spliterator :
使用简单 Consumer 无法报告处理/取消的后端
提供通过(逻辑)使用者的请求停止数据处理的方法。
请注意,这两个要求实际上有点矛盾。
示例和一些代码
在这里,我想提供一些代码的例子,我认为在给定当前api契约(接口)的情况下,没有额外的缓冲区是不可能实现的。这个例子是以你的例子为基础的。
有一个简单的collatz整数序列,可以推测它最终总是到达1。afaik这个猜想还没有被证明,但是对于许多整数(至少对于整个32位整数范围)已经得到了验证。
所以假设我们要解决的问题是:从1到1000000范围内的collatz序列流中,找到第一个十进制表示中包含“123”的序列。
这里有一个解决方案,只使用 Stream (不是 Spliterator ):

static String findGoodNumber() {
    return new Random()
            .ints(1, 1_000_000) // unbound!
            .flatMap(nr -> collatzSequence(nr))
            .mapToObj(Integer::toString)
            .filter(s -> s.contains("123"))
            .findFirst().get();
}

哪里 collatzSequence 是一个返回 Stream 包含collatz序列直到第一个1(对于挑剔者,当当前值大于 Integer.MAX_VALUE /3 所以我们不打溢出)。
诸如此类 Stream 退回人 collatzSequence 是绑定的。也是标准的 Random 最终将生成所提供范围内的每个数字。这意味着我们可以保证流中最终会有一些“好”的数字(例如 123 )以及 findFirst 短路,所以整个操作实际上会终止。然而,没有一个合理的流api实现可以预测这一点。
现在让我们假设出于某种奇怪的原因,您希望使用中间层执行相同的操作 Spliterator . 即使你只有一个逻辑,不需要不同的逻辑 Consumer s、 你不能使用 forEachRemaining . 所以你必须这样做:

static Spliterator<String> createCollatzRandomSpliterator() {
    return new Random()
            .ints(1, 1_000_000) // unbound!
            .flatMap(nr -> collatzSequence(nr))
            .mapToObj(Integer::toString)
            .spliterator();
}

static String findGoodNumberWithSpliterator() {
    Spliterator<String> source = createCollatzRandomSpliterator();

    String[] res = new String[1]; // work around for "final" closure restriction

    while (source.tryAdvance(s -> {
        if (s.contains("123")) {
            res[0] = s;
        }
    })) {
        if (res[0] != null)
            return res[0];
    }
    throw new IllegalStateException("Impossible");
}

同样重要的是,对于一些起始数字,collatz序列将包含几个匹配的数字。例如,两者 41123 以及 123370 (=41123*3+1)包含“123”。这意味着我们真的不想 Consumer 被称为后匹配的第一个命中。但是自从 Consumer 不会暴露任何方法来报告处理结束, WrappingSpliterator 不能就这样通过我们的考试 Consumer 到里面去 Spliterator . 唯一的解决办法是积累所有的内部结果 flatMap (所有后处理)放入某个缓冲区,然后一次遍历该缓冲区中的一个元素。

相关问题