启动2个线程。dataListUpdateThread
将数字2加到List
中。processFlowThread
将相同List
中的值相加,并将相加后的列表输出到控制台。代码如下:
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import static java.lang.Thread.sleep;
public class SourceExample {
private final static ActorSystem system = ActorSystem.create("SourceExample");
private static void delayOneSecond() {
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void printValue(CompletableFuture<Integer> integerCompletableFuture) {
try {
System.out.println("Sum is " + integerCompletableFuture.get().intValue());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
final List dataList = new ArrayList<Integer>();
final Thread dataListUpdateThread = new Thread(() -> {
while (true) {
dataList.add(2);
System.out.println(dataList);
delayOneSecond();
}
});
dataListUpdateThread.start();
final Thread processFlowThread = new Thread(() -> {
while (true) {
final Source<Integer, NotUsed> source = Source.from(dataList);
final Sink<Integer, CompletionStage<Integer>> sink =
Sink.fold(0, (agg, next) -> agg + next);
final CompletionStage<Integer> sum = source.runWith(sink, system);
printValue(sum.toCompletableFuture());
delayOneSecond();
}
});
processFlowThread.start();
}
}
我试着创建了一个最简单的例子来回答这个问题。dataListUpdateThread
可以从REST服务或Kafka主题填充List,而不仅仅是将值2添加到List。如果不使用Java线程,该如何实现这个场景?换句话说,如何将dataList
共享到Akka Stream进行处理?
1条答案
按热度按时间rkue9o1l1#
对传递给
Source.from
的集合进行变异只能通过巧合来实现:如果集合用尽了,Source.from
将完成流。这是因为它是为有限的、严格计算的数据而设计的(用例基本上是:a)文档的简单示例和B)在后台执行集合操作时希望限制资源消耗的情况(考虑要向其发送HTTP请求的URL列表)。NB:自从Java 7天之后,我还没有在很大程度上编写过Java,所以我不提供Java代码,只是概述了一些方法。
如前面的答案中所述,
Source.queue
可能是最佳选项(除了使用AkkaHTTP或Alpakka连接器之外)。在这种情况下,流的物化值是一个在流完成之前不会完成的未来值,Source.queue
将永远不会完成流(因为它无法知道它的引用是唯一的引用),引入一个KillSwitch
并通过viaMat
和toMat
进行传播,将给予您能够在流之外做出决定以完成数据流。Source.queue
的另一种选择是Source.actorRef
,它允许您发送一个可区分的消息(Java API中的akka.Done.done()
非常常见)。该源实体化为一个ActorRef
,您可以将tell
消息发送到该源,并且这些消息(至少是那些与流的类型匹配的消息)将可供流使用。对于
Source.queue
和Source.actorRef
,使用prematerialize
通常很有用:在您还需要接收器的物化值的示例中,替代方法是大量使用Mat
运算符来自定义物化值(在Scala中,可以使用元组来至少简化多个物化值的组合,但在Java中,一旦超出了对的范围(就像queue
),我非常肯定您必须定义一个类来保存这三个(队列、killswitch、future for completed value)物化值。同样值得注意的是,由于Akka Streams在后台运行在actor上(因此可以根据需要调度到
ActorSystem
的线程上),因此几乎没有理由创建一个线程来运行流。