akka 合并流中的2个源

rqqzpn5f  于 2022-11-05  发布在  其他
关注(0)|答案(1)|浏览(164)

我使用final Source<Double, NotUsed> sources = Source.combine(source1, source2 , null, Concat::create);来合并两个源代码。阅读https://doc.akka.io/docs/akka/current/stream/operators/Source/combine.html并没有提供合并〉2个源代码的例子,所以我不确定使用null是否是合并2个源代码的正确方法。
当我运行下面的代码时,100.0被连续输出。每个源计算值的滑动窗口的平均值,其中每个窗口大小为3。每个源之间的差异是source1利用率和source2利用率10。但是source2没有被执行为

sources.to(printSink).run(actorSystem); just outputs `100` - the first source result.

如何正确地合并source1source2,以便执行每个源代码?
源代码:

import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.javadsl.Concat;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

import java.time.Duration;
import java.util.concurrent.CompletionStage;

public class MultipleStreams {

    public static void main(String args[]) {

        ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "as");

        final String json1 = "100";
        Sink<Double, CompletionStage<Done>> printSink = Sink.foreach(System.out::println);

        final Source<Double, NotUsed> source1 = Source.repeat(json1).throttle(3, Duration.ofMillis(1000))
                .sliding(3, 3)
                .map(x -> {
                    final Double b = x.stream()
                            .mapToDouble(a -> Double.valueOf(a))
                            .sum() / x.size();
                    return b;
                });

        final String json2 = "10";
        final Source<Double, NotUsed> source2 = Source.repeat(json2).throttle(3, Duration.ofMillis(1000))
                .sliding(3, 3)
                .map(x -> {
                    return x.stream()
                            .mapToDouble(a -> Double.valueOf(a))
                            .sum() / x.size();
                });

        final Source<Double, NotUsed> sources = Source.combine(source1, source2 , null, Concat::create);

        sources.to(printSink).run(actorSystem);

    }
}
nimxete2

nimxete21#

Concat尝试首先清空第一个源。
将其更改为Merge,则输出为

100.0
10.0
100.0
10.0
100.0
10.0
100.0
10.0
10.0
100.0
10.0

相关问题