如何创建从akka-stream Source订阅时启动的Flow.Publisher

h79rfbju  于 2022-11-05  发布在  其他
关注(0)|答案(1)|浏览(152)

我必须实现一个java接口,它看起来像这样:

public Flow.Publisher<Packet> getLivePublisher();

此接口必须返回一个Flow.Publisher,该Flow.Publisher在被订阅之前保持非活动状态,并且订阅者调用Subscription.next(n)
到目前为止,我的实现看起来像

return Source
    .fromIterator(() -> new LivePacketIterator())
    .async("live-dispatcher")
    .runWith(JavaFlowSupport.Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), actorSystem);

不幸的是,这似乎立即开始从我的LivePacketIterator获得元素,即使没有订阅者已经订阅了返回的Flow.Publisher
我知道Source只是对象的可订阅源的一种模板(我的理解是,它就像一个出版商工厂),并且只有在它被具体化之后才能转换为具体的活动源。我需要以某种方式具体化我的Source来得到一个Flow.Publisher,但我希望它以一种只有在被订阅时才开始运行的方式被具体化。
我也尝试过使用toMat()

return Source
                .fromIterator(() -> new LivePacketIterator(maximumPacketSize))
                .filter(OrderSnapshotPacket::isNotEmpty)
                .async(dbDispatcher)
                .toMat(JavaFlowSupport.Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), Keep.right())
                .???;

但是我不确定如何处理生成的RunnableGraph
我的理解正确吗?有没有办法做我想做的事?

col17t5w

col17t5w1#

不幸的是,这似乎立即开始从我的LivePacketIterator中获取元素,即使没有订阅者订阅了返回的Flow.Publisher。
你到底观察到了什么来说明这一点?我用了一个和你的非常相似的片段:

Flow.Publisher<Integer> integerPublisher =
      Source.from(List.of(1,2,3,4,5))
            .wireTap(System.out::println)
            .async()
            .runWith(
              JavaFlowSupport.Sink.asPublisher(AsPublisher.WITHOUT_FANOUT),
              ActorSystem.create());

在订阅发布者之前,此操作不会开始从列表中发出项目。
我理解Source只是对象的Subscribable源的一种模板(我的理解是它就像发布者的工厂),并且它只有在被具体化之后才转换为具体的活动源
差不多。所有Flow.*接口都是reactive streams specification for JVM的一部分。Akka Streams将这些接口视为SPI,并不在其API中直接使用它们。它引入了自己的抽象,如SourceFlowSink。Akka Streams允许您将其API中表示的处理流转换为较低级别的Flow.*,就像您在代码片段中所做的那样。如果您想将Akka Streams处理管道插入到其他一些React式流实现(如RxJavaProject Reactor)中,这将非常有用。因此,Source是Akka Stream的抽象,它在某种程度上等效于Flow.Publisher,即:它是一个可能包含无限多个值的源。您需要将Source连接到Sink(可能通过一个Flow),这样你就可以得到一个可以 * 运行 * 的RunnableGraph。这将使一切都开始运行,在 * 大多数情况下 *,它将导致订阅链,元素将开始流过流。但在JavaFlowSupport.Sink.asPublisherSink的情况下,这不是唯一的选择,运行RunnableGraph会将整个Akka流转换为Flow.Publisher的一个示例。这里的语义是,订阅被延迟,直到某个地方调用了该示例上的subscribe。如果我没理解错的话,这正是你想要达到的目的。

相关问题