我正在尝试返回流操作的结果,在本例中为:
1.合计一张表
1.求值的平方
1.求值的平方
其表示为:
.fold(0, (aggr, next) -> aggr + next)
.map(x -> x * x)
.map(x -> x * x)
若要存取我使用的值
final AtomicInteger returnValue = new AtomicInteger();
然后是:
.to(Sink.foreach(x -> {
returnValue.set(x);
System.out.println("got: " + x);
}))
这需要一个阻塞调用来允许流完成,这是不可接受的:
Thread.sleep(2000);
如果我用途:
CompletableFuture<Object> futureValue =
ask(actorRef, Done.done(), Duration.ofMillis(5000)).toCompletableFuture();
System.out.println(futureValue.toCompletableFuture().get().toString());
将返回一个错误:
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://StreamsExamples/system/Materializers/StreamSupervisor-0/$$a-actorRefSource#1663100910]] after [5000 ms]. Message of type [akka.Done$]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
在这种情况下,收件人执行元是Source,并在Done.done
消息上返回以下内容:
return Optional.of(CompletionStrategy.immediately());
可以使用Akka流从流中返回计算值吗?唯一的选择是将计算值存储在DB中,或者在计算值时将其发送到Kafka主题:
.to(Sink.foreach(x -> {
?
完整的源代码:
import akka.Done;
import akka.actor.ActorRef;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
public class GetStreamValue {
final static akka.actor.ActorSystem system = akka.actor.ActorSystem.create("StreamsExamples");
public static void main(String args[]) throws InterruptedException, ExecutionException {
int bufferSize = 100;
final Source<Integer, ActorRef> source =
Source.actorRef(
elem -> {
// complete stream immediately if we send it Done
if (elem == Done.done()) {
return Optional.of(CompletionStrategy.immediately());
}
else {
return Optional.empty();
}
},
// never fail the stream because of a message
elem -> Optional.empty(),
bufferSize,
OverflowStrategy.dropHead());
final AtomicInteger returnValue = new AtomicInteger();
final ActorRef actorRef = source
.fold(0, (aggr, next) -> aggr + next)
.map(x -> x * x)
.map(x -> x * x)
.to(Sink.foreach(x -> {
returnValue.set(x);
System.out.println("got: " + x);
}))
.run(system);
Arrays.asList(1, 2, 3).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
Arrays.asList(1,2).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
actorRef.tell(Done.done(), ActorRef.noSender());
Thread.sleep(2000);
System.out.println("returnValue is "+returnValue);
}
}
1条答案
按热度按时间rjjhvcjd1#
我认为你可能缺少的是理解Akka Streams中物化值的概念。浏览文档的这一部分,特别是关于组合物化值的部分。我也尝试解释这个概念here(搜索 * 物化值 *)。如果你了解物化值,那么也许我在这里写的东西会更有意义。
对
Source.actorRef(..)
的调用返回Source<T, ActorRef>
,其中T是流经流的元素的数据类型(在您的示例中是Integer
),ActorRef
是该Source
的物化值。当您在RunnableGraph
上调用run
时,将同步获取物化值,这是to(...)
调用返回的值。ActorRef
是您可以按照Source.actorRef(...)
语义“驱动”流的方式。现在的问题是如何获得通过流传递的数据。在您的示例中,您将所有
Integers
缩减为一个,因此您可以使用Sink.head,而不是使用Sink.foreach(...)
,因为Sink.foreach(...)
会产生副作用。您可以看到,Sink
s也可以产生物化值,在Sink.head
的情况下,它物化为流中第一个元素的CompletionStage
,在您的情况下,它是唯一的元素。好吧,这并没有太大的帮助。你仍然只得到了
Source
的物化值。要得到Sink
的物化值,我们需要显式地请求它:现在我们得到了
Source
和Sink
的物化值,你可以像以前一样通过ActorRef
驱动你的流:也可以使用
CompletableStage
API从流中获取值。例如:是的,这是阻塞,但是您需要在流运行完成之前以某种方式阻止主线程完成。