基于Akka actor源的滑动窗口未按预期运行

ttp71kqs  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(159)

使用下面的代码,我试图使用一个演员作为源,并发送类型为Double的消息,通过滑动窗口处理。
滑动窗口被定义为sliding(2, 2),以计算发送的twp值的每个序列。
发送消息:

actorRef.tell(10, ActorRef.noSender());
        actorRef.tell(20, ActorRef.noSender());
        actorRef.tell(30, ActorRef.noSender());
        actorRef.tell(40, ActorRef.noSender());
        actorRef.tell(20, ActorRef.noSender());

应按如下方式计算平均值:

10 + 20 / 2 = 15
30 + 40 / 2 = 35

但是在下面的代码中似乎没有调用该计算。
这里我输出的值:

movingAverage.runForeach(n -> {
        if( n > 0){
            System.out.println(n);
        }
    }, system);

源代码:

import akka.Done;
import akka.actor.ActorRef;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

import java.util.Optional;

public class FilterThreshold {

    public static void main(String[] args) {

        final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("Source");

        final int bufferSize = 1;
        final Source<Double, ActorRef> source =
                Source.actorRef(
                        elem -> {
                            // complete stream immediately if we send it Done
                            if (elem == Done.done()) {
                                return Optional.of(CompletionStrategy.immediately());
                            } else {
                                return Optional.empty();
                            }
                        },
                        // never fail the stream because of a message
                        elem -> Optional.empty(),
                        bufferSize,
                        OverflowStrategy.dropHead());
        ActorRef actorRef = source.to(Sink.foreach(System.out::println)).run(system);

        actorRef.tell(10, ActorRef.noSender());
        actorRef.tell(20, ActorRef.noSender());
        actorRef.tell(30, ActorRef.noSender());
        actorRef.tell(40, ActorRef.noSender());
        actorRef.tell(20, ActorRef.noSender());

        Source<Double, ActorRef> movingAverage = source
                .sliding(2, 2)
                .map(window -> (window.stream().mapToDouble(i -> i).sum()) / window.size());

        movingAverage.runForeach(n -> {
            if( n > 0){
                System.out.println(n);
            }
        }, system);

    }
}

我已经编辑了https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/sliding.html中的代码
如何应用定义为movingAverage的滑动窗口函数来计算通过Akka actor actorRef发送的值?
更新:
方法permaterialize将actor系统作为参数。
正在从以下位置更新代码:

final Pair<ActorRef, Source<Double, ActorRef>> prematPair = source.preMaterialize();

至:

final Pair<ActorRef, Source<Double, ActorRef>> prematPair = source.preMaterialize(system);

会导致编译时错误:

Required type:
Pair
<ActorRef,
Source<Double, ActorRef>>

Provided:
Pair
<ActorRef,
Source<Double, NotUsed>>

我是否应该使用其他方法?
发布的更新代码:

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.japi.Pair;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Source;

import java.util.Optional;

public class FilterThreshold {

    public static void main(String[] args) {

        final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("Source");

        final int bufferSize = 1;
        final Source<Double, ActorRef> source =
                Source.actorRef(
                        elem -> {
                            System.out.println("elem is "+elem);
                            // complete stream immediately if we send it Done
                            if (elem == Done.done()) {
                                return Optional.of(CompletionStrategy.immediately());
                            } else {
                                return Optional.empty();
                            }
                        },
                        // never fail the stream because of a message
                        elem -> Optional.empty(),
                        bufferSize,
                        OverflowStrategy.dropHead());

        // source is as before
        final Pair<ActorRef, Source<Double, ActorRef>> prematPair = source.preMaterialize(system);

        Flow<Double, Double, NotUsed> movingAverageFlow =
                Flow.of(Double.class)
                        .sliding(2, 2)
                        .map(window -> (window.stream().mapToDouble(i -> i).sum()) / window.size());

        final Source<Double, ActorRef> prematSource = prematPair.second();

        prematSource.via(movingAverageFlow).runForeach(n -> {
            System.out.println("n is "+n);
            if (n > 0) {
                System.out.println(n);
            }
        }, system);

        final ActorRef actorRef = prematPair.first();

        actorRef.tell(10, ActorRef.noSender());
        actorRef.tell(20, ActorRef.noSender());
        actorRef.tell(20, ActorRef.noSender());
        actorRef.tell(20, ActorRef.noSender());
        actorRef.tell(20, ActorRef.noSender());

    }
}

更新2:
使用代码:

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.japi.Pair;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Source;

import java.util.Optional;

public class FilterThreshold {

    public static void main(String[] args) {

        final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("Source");

        final int bufferSize = 1;
        final Source<Double, ActorRef> source =
                Source.actorRef(
                        elem -> {
                            System.out.println("elem is "+elem);
                            // complete stream immediately if we send it Done
                            if (elem == Done.done()) {
                                return Optional.of(CompletionStrategy.immediately());
                            } else {
                                return Optional.empty();
                            }
                        },
                        // never fail the stream because of a message
                        elem -> Optional.empty(),
                        bufferSize,
                        OverflowStrategy.dropHead());

        // source is as before
        final Pair<ActorRef, Source<Double, NotUsed>> prematPair = source.preMaterialize(system);
        final ActorRef actorRef = prematPair.first();
        final Source<Double, NotUsed> prematSource = prematPair.second();

        Flow<Double, Double, NotUsed> movingAverageFlow =
                Flow.of(Double.class)
                        .sliding(2, 2)
                        .map(window -> (window.stream().mapToDouble(i -> i).sum()) / window.size());

        prematSource.via(movingAverageFlow).runForeach(n -> {
            System.out.println("n is "+n);
            if (n > 0) {
                System.out.println(n);
            }
        }, system);

        actorRef.tell(10, ActorRef.noSender());
        actorRef.tell(20, ActorRef.noSender());
        actorRef.tell(20, ActorRef.noSender());
        actorRef.tell(20, ActorRef.noSender());
        actorRef.tell(20, ActorRef.noSender());

        prematSource.run(system);

    }
}

印刷品:

elem is 10
elem is 20
elem is 20
elem is 20
elem is 20

因此,看起来消息被正确发送,但移动平均值没有被具体化。
使用prematSource.run(system);不是实现值的正确方法吗?

x6yk4ghg

x6yk4ghg1#

简单地说,你的source是具体化一个Source<Double, ActorRef>的一种方法,每个具体化最终都是一个不同的源。
在您的代码中,source.to(Sink.foreach(System.out::println)).run(system)是一个流,实体化的actorRef仅连接到此流,而

movingAverage.runForeach(n -> {
    if( n > 0){
        System.out.println(n);
    }
}, system);

是具有不同的具体化的ActorRef的完全独立的流(由于runForeach具体化为CompletionStage<Done>,因此最终被丢弃。
在处理Source.actorRef时,在运行流之前预物化源代码通常是一个好主意:

import akka.NotUsed
import akka.japi.Pair
import akka.stream.javadsl.Flow

// source is as before
final Pair<ActorRef, Source<Double, NotUsed>> prematPair = source.preMaterialize(system);
final ActorRef actorRef = prematPair.first();
final Source<Double, NotUsed> prematSource = prematPair.second();

Flow<Double, Double, NotUsed> movingAverageFlow =
    Flow.of(Double.class)
        .sliding(2, 2)
        .map(window -> (window.stream().mapToDouble(i -> i).sum()) / window.size());

prematSource.via(movingAverageFlow).runForeach(n -> {
    if (n > 0) {
      System.out.println(n);
    }
}, system);

(抱歉,我的Java相当生疏)

相关问题