使用akka流时文件夹未按预期工作

uhry853o  于 2022-11-05  发布在  其他
关注(0)|答案(1)|浏览(161)

下面是我编写的代码,尝试输出从本指南编辑的每个akka消息的总和:
https://doc.akka.io/docs/akka/current/stream/stream-flows-and-basics.html

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.japi.Pair;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

public class AkkaSourceTesting {

    public static void main(String args[]){

        ActorSystem actorSystem = ActorSystem.create(Behaviors.empty() , "actorSystem");

        Source<Integer, ActorRef> matValuePoweredSource =
                Source.actorRef(
                        elem -> {
                            // complete stream immediately if we send it Done
                            if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately());
                            else return Optional.empty();
                        },
                        // never fail the stream because of a message
                        elem -> Optional.empty(),
                        100,
                        OverflowStrategy.fail());

        Pair<ActorRef, Source<Integer, NotUsed>> actorRefSourcePair =
                matValuePoweredSource.preMaterialize(actorSystem);

        actorRefSourcePair.first().tell(1, ActorRef.noSender());
        actorRefSourcePair.first().tell(1, ActorRef.noSender());
        actorRefSourcePair.first().tell(1, ActorRef.noSender());

        Flow<Integer, Integer, NotUsed> groupedFlow = Flow.of(Integer.class)
                .grouped(2)
                .map(value -> {
                    List<Integer> newList = new ArrayList<>(value);
                    return newList;
                })
                .mapConcat(value -> value);

        // pass source around for materialization
        actorRefSourcePair.second().via(Flow.of(Integer.class).via(groupedFlow).fold(0, (res, element) -> res + element)).runWith(Sink.foreach(System.out::println), actorSystem);

    }
}

折叠操作似乎导致控制台上无输出。
然而如果我用

actorRefSourcePair.second().via(Flow.of(Integer.class).via(groupedFlow).map(x -> x * 2)).runWith(Sink.foreach(System.out::println), actorSystem);

而不是

actorRefSourcePair.second().via(Flow.of(Integer.class).via(groupedFlow).fold(0, (res, element) -> res + element)).runWith(Sink.foreach(System.out::println), actorSystem);

然后输出以下内容:

2
2

我试图对列表进行分组,并对每个组执行折叠操作,但折叠操作甚至没有执行。我是否错过了某个步骤?

5uzkadbs

5uzkadbs1#

Flow.fold在上游完成之前不会发出值。
还要注意,您的groupedFlow是一个身份流:可以在不改变任何内容的情况下将其删除:

  • grouped获取每一个连续的元素对,并将它们绑定到List
  • map级将该List转换为ArrayList
  • mapConcat展开ArrayList并发出元素

在Java中,您所寻找的(连续的2个组的对之和的流)最清晰的表达可能是沿着以下路线

actorRefSourcePair.second()
    .grouped(2)
    .map(twoList -> twoList.stream().reduce(0, Integer::sum))
    .runWith(Sink.foreach(System.out::println), actorSystem);

相关问题