如何访问Akka流的计算结果?

s3fp2yjn  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(354)

我正在尝试返回流操作的结果,在本例中为:
1.合计一张表
1.求值的平方
1.求值的平方
其表示为:

.fold(0, (aggr, next) -> aggr + next)
        .map(x -> x * x)
        .map(x -> x * x)

若要存取我使用的值

final AtomicInteger returnValue = new AtomicInteger();

然后是:

.to(Sink.foreach(x -> {
            returnValue.set(x);
            System.out.println("got: " + x);
        }))

这需要一个阻塞调用来允许流完成,这是不可接受的:

Thread.sleep(2000);

如果我用途:

CompletableFuture<Object> futureValue =
            ask(actorRef, Done.done(), Duration.ofMillis(5000)).toCompletableFuture();
    System.out.println(futureValue.toCompletableFuture().get().toString());

将返回一个错误:

Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://StreamsExamples/system/Materializers/StreamSupervisor-0/$$a-actorRefSource#1663100910]] after [5000 ms]. Message of type [akka.Done$]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.

在这种情况下,收件人执行元是Source,并在Done.done消息上返回以下内容:

return Optional.of(CompletionStrategy.immediately());

可以使用Akka流从流中返回计算值吗?唯一的选择是将计算值存储在DB中,或者在计算值时将其发送到Kafka主题:

.to(Sink.foreach(x -> {


完整的源代码:

import akka.Done;
import akka.actor.ActorRef;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

public class GetStreamValue {

    final static akka.actor.ActorSystem system = akka.actor.ActorSystem.create("StreamsExamples");

    public static void main(String args[]) throws InterruptedException, ExecutionException {

        int bufferSize = 100;
        final Source<Integer, ActorRef> source =
                Source.actorRef(
                        elem -> {
                            // complete stream immediately if we send it Done
                            if (elem == Done.done()) {
                                return Optional.of(CompletionStrategy.immediately());
                            }
                            else {
                                return Optional.empty();
                            }
                        },
                        // never fail the stream because of a message
                        elem -> Optional.empty(),
                        bufferSize,
                        OverflowStrategy.dropHead());

        final AtomicInteger returnValue = new AtomicInteger();

        final ActorRef actorRef = source
                .fold(0, (aggr, next) -> aggr + next)
                .map(x -> x * x)
                .map(x -> x * x)
                .to(Sink.foreach(x -> {
                    returnValue.set(x);
                    System.out.println("got: " + x);
                }))
                .run(system);

        Arrays.asList(1, 2, 3).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
        Arrays.asList(1,2).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
        actorRef.tell(Done.done(), ActorRef.noSender());

        Thread.sleep(2000);

        System.out.println("returnValue is "+returnValue);

    }
}
rjjhvcjd

rjjhvcjd1#

我认为你可能缺少的是理解Akka Streams中物化值的概念。浏览文档的这一部分,特别是关于组合物化值的部分。我也尝试解释这个概念here(搜索 * 物化值 *)。如果你了解物化值,那么也许我在这里写的东西会更有意义。
Source.actorRef(..)的调用返回Source<T, ActorRef>,其中T是流经流的元素的数据类型(在您的示例中是Integer),ActorRef是该Source物化值。当您在RunnableGraph上调用run时,将同步获取物化值,这是to(...)调用返回的值。
ActorRef是您可以按照Source.actorRef(...)语义“驱动”流的方式。
现在的问题是如何获得通过流传递的数据。在您的示例中,您将所有Integers缩减为一个,因此您可以使用Sink.head,而不是使用Sink.foreach(...),因为Sink.foreach(...)会产生副作用。您可以看到,Sink s也可以产生物化值,在Sink.head的情况下,它物化为流中第一个元素的CompletionStage,在您的情况下,它是唯一的元素。

final ActorRef actorRef = source
                                .fold(0, (aggr, next) -> aggr + next)
                                .map(x -> x * x)
                                .map(x -> x * x)
                                .to(Sink.head())
                                .run(system);

好吧,这并没有太大的帮助。你仍然只得到了Source的物化值。要得到Sink的物化值,我们需要显式地请求它:

final Pair<ActorRef, CompletionStage<Integer>> matVals =
      source
        .fold(0, (aggr, next) -> aggr + next)
        .map(x -> x * x)
        .map(x -> x * x)
        .toMat(Sink.head(), Keep.both())
        .run(system);

现在我们得到了SourceSink的物化值,你可以像以前一样通过ActorRef驱动你的流:

final ActorRef actorRef = matVals.first();

Arrays.asList(1, 2, 3).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
Arrays.asList(1,2).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
actorRef.tell(Done.done(), ActorRef.noSender());

也可以使用CompletableStage API从流中获取值。例如:

Integer folded = matVals.second().toCompletableFuture().join();

是的,这是阻塞,但是您需要在流运行完成之前以某种方式阻止主线程完成。

相关问题