更改akka流的源数据

ddrv8njm  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(106)

我正在学习Java Akka流,并使用https://doc.akka.io/docs/akka/current/stream/stream-flows-and-basics.html定义了以下内容:

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;

public class SourceExample {

    static ActorSystem system = ActorSystem.create("SourceExample");

    public static void main(String args[]) throws ExecutionException, InterruptedException {

        final List<Integer> sourceData = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        final Source<Integer, NotUsed> source =
                Source.from(sourceData);
        final Sink<Integer, CompletionStage<Integer>> sink =
                Sink.<Integer, Integer>fold(0, (agg, next) -> agg + next);

        final CompletionStage<Integer> sum = source.runWith(sink, system);

        System.out.println(sum.toCompletableFuture().get());
    }

}

运行此代码的行为与预期一致。
Akka Streams正在解决的问题是此代码可以重复执行。
在现实场景中,sourceData不会是静态的,Akka Streams对如何处理变化的数据有意见吗?还是由开发人员决定?
在最简单的情况下,当源数据发生变化时,每X分钟重新执行一次流Flow(例如,使用计划任务)。或者Akka流是长期存在的,源数据发生变化,流计算根据一些参数重新执行?
Akka Streams文档定义了多个数据源,但我不明白应该如何利用Akka Streams来处理不断变化的源数据。

k97glaaz

k97glaaz1#

Akka Streams可以,而且经常会一直运行到应用程序停止前不久。例如,在应用程序中,经常会有一个流消耗(例如,使用Alpakka Kafka的Kafka消费者源)Kafka记录,并且在应用程序被杀死之前不会停止。
详细地说,一条小溪一直流到这样的时候:

  • 阶段发出完成信号(例如,在您的示例中,Source.from将在发出10后发出完成信号)
  • 阶段失败(通常引发异常)

Source.queue是一个对动态数据有用的示例源(没有引入Alpakka或Akka HTTP),它具体化为一个队列,对于该队列,入队的元素可用于流。

相关问题