在Java Akka流中处理更改的源数据

zengzsys  于 2022-11-06  发布在  Java
关注(0)|答案(1)|浏览(135)

启动2个线程。dataListUpdateThread将数字2加到List中。processFlowThread将相同List中的值相加,并将相加后的列表输出到控制台。代码如下:

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;

import static java.lang.Thread.sleep;

public class SourceExample {

    private final static ActorSystem system = ActorSystem.create("SourceExample");

    private static void delayOneSecond() {
        try {
            sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void printValue(CompletableFuture<Integer> integerCompletableFuture) {
        try {
            System.out.println("Sum is " + integerCompletableFuture.get().intValue());
        } catch (ExecutionException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {

        final List dataList = new ArrayList<Integer>();
        final Thread dataListUpdateThread = new Thread(() -> {
            while (true) {
                dataList.add(2);
                System.out.println(dataList);
                delayOneSecond();
            }
        });
        dataListUpdateThread.start();

        final Thread processFlowThread = new Thread(() -> {
            while (true) {
                final Source<Integer, NotUsed> source = Source.from(dataList);

                final Sink<Integer, CompletionStage<Integer>> sink =
                        Sink.fold(0, (agg, next) -> agg + next);

                final CompletionStage<Integer> sum = source.runWith(sink, system);

                printValue(sum.toCompletableFuture());

                delayOneSecond();
            }
        });

        processFlowThread.start();
    }
}

我试着创建了一个最简单的例子来回答这个问题。dataListUpdateThread可以从REST服务或Kafka主题填充List,而不仅仅是将值2添加到List。如果不使用Java线程,该如何实现这个场景?换句话说,如何将dataList共享到Akka Stream进行处理?

rkue9o1l

rkue9o1l1#

对传递给Source.from的集合进行变异只能通过巧合来实现:如果集合用尽了,Source.from将完成流。这是因为它是为有限的、严格计算的数据而设计的(用例基本上是:a)文档的简单示例和B)在后台执行集合操作时希望限制资源消耗的情况(考虑要向其发送HTTP请求的URL列表)。
NB:自从Java 7天之后,我还没有在很大程度上编写过Java,所以我不提供Java代码,只是概述了一些方法。
如前面的答案中所述,Source.queue可能是最佳选项(除了使用AkkaHTTP或Alpakka连接器之外)。在这种情况下,流的物化值是一个在流完成之前不会完成的未来值,Source.queue将永远不会完成流(因为它无法知道它的引用是唯一的引用),引入一个KillSwitch并通过viaMattoMat进行传播,将给予您能够在流之外做出决定以完成数据流。
Source.queue的另一种选择是Source.actorRef,它允许您发送一个可区分的消息(Java API中的akka.Done.done()非常常见)。该源实体化为一个ActorRef,您可以将tell消息发送到该源,并且这些消息(至少是那些与流的类型匹配的消息)将可供流使用。
对于Source.queueSource.actorRef,使用prematerialize通常很有用:在您还需要接收器的物化值的示例中,替代方法是大量使用Mat运算符来自定义物化值(在Scala中,可以使用元组来至少简化多个物化值的组合,但在Java中,一旦超出了对的范围(就像queue),我非常肯定您必须定义一个类来保存这三个(队列、killswitch、future for completed value)物化值。
同样值得注意的是,由于Akka Streams在后台运行在actor上(因此可以根据需要调度到ActorSystem的线程上),因此几乎没有理由创建一个线程来运行流。

相关问题