使AkkaStreams中的大型流失败最少的元素

qhhrdooz  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(128)

我有以下情况:一个至少有10M元素的流。但是有一个最小值1M,如果没有达到最小值,流必须失败。这个流在Akka Streams中是这样实现的:

Source.range(0,10_000_000).via(someBusiness())
.runWith(Sink.ignore(),actorSystem);

Sink.seq()之类的接收器不是一个选项,因为我不希望内存中有这么大的集合。
我希望任何流,计数元素,并在流结束时验证计数。

vi4fp9gy

vi4fp9gy1#

大致的想法是:

  • 将每个来自源代码的传入元素转换为一个 Package 器(单元素数组可能是Java中最简单的方法(我猜您正在用Java编写此代码))
  • concat-Source.single,其生成易于区分的 Package 器(例如,零元素数组);在主源耗尽之前,流不会流向该源
  • 跟踪元素计数,如果未达到最小元素数,则抛出concat ted元素;否则展开元素

所以类似于(如果Java很糟糕,请道歉)

int[] zeroElems = new int[0];

Source.range(0, 10_000_000)
    .map(x -> {
        int[] wrapped = new int[1];
        wrapped[0] = x;
        return wrapped;
    })
    .concat(Source.single(zeroElems))
    .statefulMapConcat(
        () -> {
            // trick to get around Java prohibition of closing over non-final variables
            final int[] counter = {0};
            return (wrapper) -> {
                if (counter[0] < 1_000_000) {
                    if (wrapper.length < 1) {
                        throw new RuntimeException("Insufficient number of elements in stream");
                    }

                    counter[0]++; 
                }

                return Arrays.stream(wrapper).iterator();
            }
        }
    )
    .via(someBusiness())
    .runWith(Sink.ignore(), actorSystem)

相关问题