我有以下情况:一个至少有10M元素的流。但是有一个最小值1M,如果没有达到最小值,流必须失败。这个流在Akka Streams中是这样实现的:
Source.range(0,10_000_000).via(someBusiness()) .runWith(Sink.ignore(),actorSystem);
Sink.seq()之类的接收器不是一个选项,因为我不希望内存中有这么大的集合。我希望任何流,计数元素,并在流结束时验证计数。
vi4fp9gy1#
大致的想法是:
concat
Source.single
所以类似于(如果Java很糟糕,请道歉)
int[] zeroElems = new int[0]; Source.range(0, 10_000_000) .map(x -> { int[] wrapped = new int[1]; wrapped[0] = x; return wrapped; }) .concat(Source.single(zeroElems)) .statefulMapConcat( () -> { // trick to get around Java prohibition of closing over non-final variables final int[] counter = {0}; return (wrapper) -> { if (counter[0] < 1_000_000) { if (wrapper.length < 1) { throw new RuntimeException("Insufficient number of elements in stream"); } counter[0]++; } return Arrays.stream(wrapper).iterator(); } } ) .via(someBusiness()) .runWith(Sink.ignore(), actorSystem)
1条答案
按热度按时间vi4fp9gy1#
大致的想法是:
concat
-Source.single
,其生成易于区分的 Package 器(例如,零元素数组);在主源耗尽之前,流不会流向该源concat
ted元素;否则展开元素所以类似于(如果Java很糟糕,请道歉)