我必须实现一个java接口,它看起来像这样:
public Flow.Publisher<Packet> getLivePublisher();
此接口必须返回一个Flow.Publisher
,该Flow.Publisher
在被订阅之前保持非活动状态,并且订阅者调用Subscription.next(n)
。
到目前为止,我的实现看起来像
return Source
.fromIterator(() -> new LivePacketIterator())
.async("live-dispatcher")
.runWith(JavaFlowSupport.Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), actorSystem);
不幸的是,这似乎立即开始从我的LivePacketIterator
获得元素,即使没有订阅者已经订阅了返回的Flow.Publisher
。
我知道Source
只是对象的可订阅源的一种模板(我的理解是,它就像一个出版商工厂),并且只有在它被具体化之后才能转换为具体的活动源。我需要以某种方式具体化我的Source
来得到一个Flow.Publisher
,但我希望它以一种只有在被订阅时才开始运行的方式被具体化。
我也尝试过使用toMat()
return Source
.fromIterator(() -> new LivePacketIterator(maximumPacketSize))
.filter(OrderSnapshotPacket::isNotEmpty)
.async(dbDispatcher)
.toMat(JavaFlowSupport.Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), Keep.right())
.???;
但是我不确定如何处理生成的RunnableGraph
。
我的理解正确吗?有没有办法做我想做的事?
1条答案
按热度按时间col17t5w1#
不幸的是,这似乎立即开始从我的LivePacketIterator中获取元素,即使没有订阅者订阅了返回的Flow.Publisher。
你到底观察到了什么来说明这一点?我用了一个和你的非常相似的片段:
在订阅发布者之前,此操作不会开始从列表中发出项目。
我理解Source只是对象的Subscribable源的一种模板(我的理解是它就像发布者的工厂),并且它只有在被具体化之后才转换为具体的活动源
差不多。所有
Flow.*
接口都是reactive streams specification for JVM的一部分。Akka Streams将这些接口视为SPI,并不在其API中直接使用它们。它引入了自己的抽象,如Source
,Flow
和Sink
。Akka Streams允许您将其API中表示的处理流转换为较低级别的Flow.*,就像您在代码片段中所做的那样。如果您想将Akka Streams处理管道插入到其他一些React式流实现(如RxJava或Project Reactor)中,这将非常有用。因此,Source
是Akka Stream的抽象,它在某种程度上等效于Flow.Publisher
,即:它是一个可能包含无限多个值的源。您需要将Source
连接到Sink
(可能通过一个Flow
),这样你就可以得到一个可以 * 运行 * 的RunnableGraph
。这将使一切都开始运行,在 * 大多数情况下 *,它将导致订阅链,元素将开始流过流。但在JavaFlowSupport.Sink.asPublisher
Sink
的情况下,这不是唯一的选择,运行RunnableGraph
会将整个Akka流转换为Flow.Publisher
的一个示例。这里的语义是,订阅被延迟,直到某个地方调用了该示例上的subscribe
。如果我没理解错的话,这正是你想要达到的目的。