我正在寻找akka.stream.scaladsl.Source
构造方法,它将允许我简单地从代码的不同位置发出下一个值(例如,监视系统事件)。
- 我需要类似于
Promise
的东西。Promise向Future
发出单个值。我需要向Source
发出多个值。 - 例如
monix.reactive.subjects.BehaviorSubject.onNext(_)
- 我不太在乎背压。
目前我已经使用monix和akka-streams实现了这个功能(代码如下),但我希望应该有一个只有akka-streams的解决方案:
import akka.stream.scaladsl.{Flow, Sink, Source}
import monix.reactive.subjects.BehaviorSubject
import monix.execution.Scheduler.Implicits.global
val bs = BehaviorSubject("") //monix subject is sink and source at the same time
//this is how it is currently implemented
def createSource() = {
val s1 = Source.fromPublisher(bs.toReactivePublisher) //here we create source from subject
Flow.fromSinkAndSourceCoupled[String, String](Sink.ignore, s1)
}
//somewhere else in code... some event happened
//this is how it works in monix.
val res:Future[Ack] = bs.onNext("Event #1471 just happened!") //here we emit value
4条答案
按热度按时间vsikbqxv1#
也许你正在寻找演员来源
文档中的示例:
通过这种方式,您可以通过actor系统向actor发送消息,这些消息将从
ActorSource
向下发送到流中。4dc9hkyq2#
Source
抽象,顾名思义,提供API来处理数据源。相反,您需要查看使用数据的抽象-Sink
。而Sink.foreach
操作是您正在寻找的,最有可能的是:https://doc.akka.io/docs/akka/current/stream/operators/Sink/foreach.html在您的示例中,代码如下所示:
希望这对你有帮助!
lmvvr0a83#
我想你要找的是
sink.foreach
。它为每个接收到的元素调用一个给定的过程。我想代码看起来像这样:本质上,对于流a源,所做的是,接收器尝试写入该流的每个元素。
编辑
我认为你正在寻找
maybe
。它创建一个源,一旦实现的承诺完成一个值发射。看看这个documentation编辑
futureSource也可以工作,它在成功完成后,将给定的未来源的元素流化。
如果有帮助的话,让我知道!!
bnl4lu3b4#
https://doc.akka.io/docs/akka/current/stream/operators/Source/fromIterator.html或https://doc.akka.io/docs/akka/current/stream/operators/Source/fromPublisher.html是您需要的,这取决于Source从何处使用数据。