我有一个非常简单的演员,只是打印的数字:-
public class PrintLineActor extends AbstractLoggingActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Integer.class, i -> {
System.out.println("Processing: " + i);
sender().tell(i, self());
}).build();
}
}
现在,我有一个流来打印偶数,直到遇到奇数元素:-
@Test
public void streamsTest() throws Exception {
ActorSystem system = ActorSystem.create("testSystem");
ActorRef printActor = system.actorOf(Props.create(PrintLineActor.class));
Integer[] intArray = new Integer[]{2,4,6,8,9,10,12};
CompletionStage<List<Integer>> result = Source.from(Arrays.asList(intArray))
.ask(1, printActor, Integer.class, Timeout.apply(10, TimeUnit.SECONDS))
.takeWhile(i -> i != 9)
.runWith(Sink.seq(), ActorMaterializer.create(system));
List<Integer> result1 = result.toCompletableFuture().get();
System.out.println("Result :- ");
result1.forEach(System.out::println);
}
我不期望任何在9之后的元素被处理,也就是被发送到actor。但是,我看到数字“10”也被actor处理(而不是12),如下面的输出所示
Processing: 2
Processing: 4
Processing: 6
Processing: 8
Processing: 9
Processing: 10 //WHY IS THIS BEING PROCESSED BY ACTOR??
Result :-
2
4
6
8
为什么10正在被演员处理?如何阻止?
编辑:
我试过通过记录事件的时间戳来调试,只是为了看看在9实际完成之前是否正在处理10,但没有,10是在9完全处理之后进行的。下面是日志:-
Before Ask: 2 in 1596035906509
Processing inside Actor: 2 at 1596035906509
Inside TakeWhile 2 at in 1596035906509
Before Ask: 4 in 1596035906609
Processing inside Actor: 4 at 1596035906610
Inside TakeWhile 4 at in 1596035906610
Before Ask: 6 in 1596035906712
Processing inside Actor: 6 at 1596035906712
Inside TakeWhile 6 at in 1596035906712
Before Ask: 8 in 1596035906814
Processing inside Actor: 8 at 1596035906814
Inside TakeWhile 8 at in 1596035906815
Before Ask: 9 in 1596035906915
Processing inside Actor: 9 at 1596035906915
Inside TakeWhile 9 at in 1596035906916
Before Ask: 10 in 1596035907017 //so 10 is taken much after the 9 is processed fully
Processing inside Actor: 10 at 1596035907017
Result :-
2
4
6
8
另外,如果我用一个direct.map(print..)替换.ask,那么10就不会被打印出来。所以为什么当涉及actor.ask时会发生这种情况,这对我来说很奇怪。
2条答案
按热度按时间6yt4nkrj1#
因为您是异步地
ask
printActor而不是同步地打印。在您的确切运行中:要解决确切的问题,请删除异步请求,改为同步打印。但不确定PrintActor是否只是一个类比,请告诉我。
ukqbszuj2#
akka流正在通过不同的流缓冲值。请注意,10被处理,但它不是结果的一部分。如果愿意,您可以配置缓冲区大小: