Akka Streams takeWhile即使在条件失败后仍处理下一个元素

dm7nw8vv  于 2022-11-06  发布在  其他
关注(0)|答案(2)|浏览(148)

我有一个非常简单的演员,只是打印的数字:-

public class PrintLineActor extends AbstractLoggingActor {

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(Integer.class, i -> {
          System.out.println("Processing: " + i);
          sender().tell(i, self());
        }).build();
  }
}

现在,我有一个流来打印偶数,直到遇到奇数元素:-

@Test
  public void streamsTest() throws Exception {

    ActorSystem system = ActorSystem.create("testSystem");
    ActorRef printActor = system.actorOf(Props.create(PrintLineActor.class));

    Integer[] intArray = new Integer[]{2,4,6,8,9,10,12};
    CompletionStage<List<Integer>> result = Source.from(Arrays.asList(intArray))
        .ask(1, printActor, Integer.class, Timeout.apply(10, TimeUnit.SECONDS))
        .takeWhile(i -> i != 9)
        .runWith(Sink.seq(), ActorMaterializer.create(system));

    List<Integer> result1 = result.toCompletableFuture().get();
    System.out.println("Result :- ");
    result1.forEach(System.out::println);
  }

不期望任何在9之后的元素被处理,也就是被发送到actor。但是,我看到数字“10”也被actor处理(而不是12),如下面的输出所示

Processing: 2
Processing: 4
Processing: 6
Processing: 8
Processing: 9
Processing: 10 //WHY IS THIS BEING PROCESSED BY ACTOR??

Result :- 
2
4
6
8

为什么10正在被演员处理?如何阻止?
编辑:
我试过通过记录事件的时间戳来调试,只是为了看看在9实际完成之前是否正在处理10,但没有,10是在9完全处理之后进行的。下面是日志:-

Before Ask: 2 in 1596035906509
Processing inside Actor: 2 at 1596035906509
Inside TakeWhile 2 at  in 1596035906509

Before Ask: 4 in 1596035906609
Processing inside Actor: 4 at 1596035906610
Inside TakeWhile 4 at  in 1596035906610

Before Ask: 6 in 1596035906712
Processing inside Actor: 6 at 1596035906712
Inside TakeWhile 6 at  in 1596035906712

Before Ask: 8 in 1596035906814
Processing inside Actor: 8 at 1596035906814
Inside TakeWhile 8 at  in 1596035906815

Before Ask: 9 in 1596035906915
Processing inside Actor: 9 at 1596035906915
Inside TakeWhile 9 at  in 1596035906916

Before Ask: 10 in 1596035907017 //so 10 is taken much after the 9 is processed fully
Processing inside Actor: 10 at 1596035907017

Result :- 
2
4
6
8

另外,如果我用一个direct.map(print..)替换.ask,那么10就不会被打印出来。所以为什么当涉及actor.ask时会发生这种情况,这对我来说很奇怪。

6yt4nkrj

6yt4nkrj1#

因为您是异步地ask printActor而不是同步地打印。在您的确切运行中:

  • 消息9到达PrintActor,打印“正在处理:9英寸
  • 消息10到达PrintActor,打印“正在处理:10英寸
  • 您的Akka流从PrintActor接收消息9的响应消息,完成Akka流,因此在结果中既没有9也没有10。

要解决确切的问题,请删除异步请求,改为同步打印。但不确定PrintActor是否只是一个类比,请告诉我。

ukqbszuj

ukqbszuj2#

akka流正在通过不同的流缓冲值。请注意,10被处理,但它不是结果的一部分。如果愿意,您可以配置缓冲区大小:

.ask(1, printActor, Integer.class, Timeout.apply(10, TimeUnit.SECONDS)).buffer(1, OverflowStrategy.backpressure)

相关问题