我是Akka和Akka流的新手。我创建了一个虚拟流,我希望它以一个异常结束,因为我的map()函数非常慢,我将缓冲区设置为1
。
所以我的问题分为两部分:
1.为什么这段代码可以正常工作而不会失败?
1.如何模拟溢出?(用于学习目的)
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
public class Application {
public static void main(String[] args) {
final ActorSystem system = ActorSystem.create("reactive-test");
Source<Integer, NotUsed> source =
Source.range(0, 10000000)
.buffer(1, OverflowStrategy.fail())
.map(Application::doubleInt);
source.runWith(Sink.foreach(a -> System.out.println(a)), system);
}
private static Integer doubleInt(int i) {
try {
Thread.sleep(2_000);
} catch (Exception e) {
System.out.println(e);
}
return 2 * i;
}
}
1条答案
按热度按时间zxlwwiss1#
为什么这段代码可以正常工作而不会失败?
原因是反压力。源产生的元素不会比接收器消耗的元素多,因此慢的接收器会直接影响元素产生的速度。因此,缓冲区永远不会溢出。
如何模拟溢出?(用于学习目的)
拥有一个渴望消费但同时又很慢的接收器。可以通过添加一个
grouped(1000)
来模拟它,该grouped(1000)
创建一个包含1000个元素的列表并将其传递给下游。生产