使用下面的代码,我试图使用一个演员作为源,并发送类型为Double的消息,通过滑动窗口处理。
滑动窗口被定义为sliding(2, 2)
,以计算发送的twp值的每个序列。
发送消息:
actorRef.tell(10, ActorRef.noSender());
actorRef.tell(20, ActorRef.noSender());
actorRef.tell(30, ActorRef.noSender());
actorRef.tell(40, ActorRef.noSender());
actorRef.tell(20, ActorRef.noSender());
应按如下方式计算平均值:
10 + 20 / 2 = 15
30 + 40 / 2 = 35
但是在下面的代码中似乎没有调用该计算。
这里我输出的值:
movingAverage.runForeach(n -> {
if( n > 0){
System.out.println(n);
}
}, system);
源代码:
import akka.Done;
import akka.actor.ActorRef;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.util.Optional;
public class FilterThreshold {
public static void main(String[] args) {
final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("Source");
final int bufferSize = 1;
final Source<Double, ActorRef> source =
Source.actorRef(
elem -> {
// complete stream immediately if we send it Done
if (elem == Done.done()) {
return Optional.of(CompletionStrategy.immediately());
} else {
return Optional.empty();
}
},
// never fail the stream because of a message
elem -> Optional.empty(),
bufferSize,
OverflowStrategy.dropHead());
ActorRef actorRef = source.to(Sink.foreach(System.out::println)).run(system);
actorRef.tell(10, ActorRef.noSender());
actorRef.tell(20, ActorRef.noSender());
actorRef.tell(30, ActorRef.noSender());
actorRef.tell(40, ActorRef.noSender());
actorRef.tell(20, ActorRef.noSender());
Source<Double, ActorRef> movingAverage = source
.sliding(2, 2)
.map(window -> (window.stream().mapToDouble(i -> i).sum()) / window.size());
movingAverage.runForeach(n -> {
if( n > 0){
System.out.println(n);
}
}, system);
}
}
我已经编辑了https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/sliding.html中的代码
如何应用定义为movingAverage
的滑动窗口函数来计算通过Akka actor actorRef
发送的值?
更新:
方法permaterialize
将actor系统作为参数。
正在从以下位置更新代码:
final Pair<ActorRef, Source<Double, ActorRef>> prematPair = source.preMaterialize();
至:
final Pair<ActorRef, Source<Double, ActorRef>> prematPair = source.preMaterialize(system);
会导致编译时错误:
Required type:
Pair
<ActorRef,
Source<Double, ActorRef>>
Provided:
Pair
<ActorRef,
Source<Double, NotUsed>>
我是否应该使用其他方法?
发布的更新代码:
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.japi.Pair;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Source;
import java.util.Optional;
public class FilterThreshold {
public static void main(String[] args) {
final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("Source");
final int bufferSize = 1;
final Source<Double, ActorRef> source =
Source.actorRef(
elem -> {
System.out.println("elem is "+elem);
// complete stream immediately if we send it Done
if (elem == Done.done()) {
return Optional.of(CompletionStrategy.immediately());
} else {
return Optional.empty();
}
},
// never fail the stream because of a message
elem -> Optional.empty(),
bufferSize,
OverflowStrategy.dropHead());
// source is as before
final Pair<ActorRef, Source<Double, ActorRef>> prematPair = source.preMaterialize(system);
Flow<Double, Double, NotUsed> movingAverageFlow =
Flow.of(Double.class)
.sliding(2, 2)
.map(window -> (window.stream().mapToDouble(i -> i).sum()) / window.size());
final Source<Double, ActorRef> prematSource = prematPair.second();
prematSource.via(movingAverageFlow).runForeach(n -> {
System.out.println("n is "+n);
if (n > 0) {
System.out.println(n);
}
}, system);
final ActorRef actorRef = prematPair.first();
actorRef.tell(10, ActorRef.noSender());
actorRef.tell(20, ActorRef.noSender());
actorRef.tell(20, ActorRef.noSender());
actorRef.tell(20, ActorRef.noSender());
actorRef.tell(20, ActorRef.noSender());
}
}
更新2:
使用代码:
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.japi.Pair;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Source;
import java.util.Optional;
public class FilterThreshold {
public static void main(String[] args) {
final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("Source");
final int bufferSize = 1;
final Source<Double, ActorRef> source =
Source.actorRef(
elem -> {
System.out.println("elem is "+elem);
// complete stream immediately if we send it Done
if (elem == Done.done()) {
return Optional.of(CompletionStrategy.immediately());
} else {
return Optional.empty();
}
},
// never fail the stream because of a message
elem -> Optional.empty(),
bufferSize,
OverflowStrategy.dropHead());
// source is as before
final Pair<ActorRef, Source<Double, NotUsed>> prematPair = source.preMaterialize(system);
final ActorRef actorRef = prematPair.first();
final Source<Double, NotUsed> prematSource = prematPair.second();
Flow<Double, Double, NotUsed> movingAverageFlow =
Flow.of(Double.class)
.sliding(2, 2)
.map(window -> (window.stream().mapToDouble(i -> i).sum()) / window.size());
prematSource.via(movingAverageFlow).runForeach(n -> {
System.out.println("n is "+n);
if (n > 0) {
System.out.println(n);
}
}, system);
actorRef.tell(10, ActorRef.noSender());
actorRef.tell(20, ActorRef.noSender());
actorRef.tell(20, ActorRef.noSender());
actorRef.tell(20, ActorRef.noSender());
actorRef.tell(20, ActorRef.noSender());
prematSource.run(system);
}
}
印刷品:
elem is 10
elem is 20
elem is 20
elem is 20
elem is 20
因此,看起来消息被正确发送,但移动平均值没有被具体化。
使用prematSource.run(system);
不是实现值的正确方法吗?
1条答案
按热度按时间x6yk4ghg1#
简单地说,你的
source
是具体化一个Source<Double, ActorRef>
的一种方法,每个具体化最终都是一个不同的源。在您的代码中,
source.to(Sink.foreach(System.out::println)).run(system)
是一个流,实体化的actorRef
仅连接到此流,而是具有不同的具体化的
ActorRef
的完全独立的流(由于runForeach
具体化为CompletionStage<Done>
,因此最终被丢弃。在处理
Source.actorRef
时,在运行流之前预物化源代码通常是一个好主意:(抱歉,我的Java相当生疏)