我使用final Source<Double, NotUsed> sources = Source.combine(source1, source2 , null, Concat::create);
来合并两个源代码。阅读https://doc.akka.io/docs/akka/current/stream/operators/Source/combine.html并没有提供合并〉2个源代码的例子,所以我不确定使用null是否是合并2个源代码的正确方法。
当我运行下面的代码时,100.0被连续输出。每个源计算值的滑动窗口的平均值,其中每个窗口大小为3。每个源之间的差异是source1
利用率和source2
利用率10。但是source2
没有被执行为
sources.to(printSink).run(actorSystem); just outputs `100` - the first source result.
如何正确地合并source1
和source2
,以便执行每个源代码?
源代码:
import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.javadsl.Concat;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
public class MultipleStreams {
public static void main(String args[]) {
ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "as");
final String json1 = "100";
Sink<Double, CompletionStage<Done>> printSink = Sink.foreach(System.out::println);
final Source<Double, NotUsed> source1 = Source.repeat(json1).throttle(3, Duration.ofMillis(1000))
.sliding(3, 3)
.map(x -> {
final Double b = x.stream()
.mapToDouble(a -> Double.valueOf(a))
.sum() / x.size();
return b;
});
final String json2 = "10";
final Source<Double, NotUsed> source2 = Source.repeat(json2).throttle(3, Duration.ofMillis(1000))
.sliding(3, 3)
.map(x -> {
return x.stream()
.mapToDouble(a -> Double.valueOf(a))
.sum() / x.size();
});
final Source<Double, NotUsed> sources = Source.combine(source1, source2 , null, Concat::create);
sources.to(printSink).run(actorSystem);
}
}
1条答案
按热度按时间nimxete21#
Concat
尝试首先清空第一个源。将其更改为
Merge
,则输出为