动态合并Akka流

n9vozmp4  于 2022-11-06  发布在  其他
关注(0)|答案(2)|浏览(139)

我尝试使用Akka流来构建一个pub sub总线,方法如下:
发布服务器为主题添加源流,订阅服务器指定主题并获取该主题的所有内容。但是,主题可以由多个发布服务器发布,并且发布服务器与订阅服务器都可以随时加入。
我的想法是合并所有源,然后将过滤后的源返回给订阅者。
然而,由于发布者可以在任何点加入,因此可以在进行订阅之后添加源,并且订阅者需要从该源获得数据,就像该主题的任何其他已发布数据一样。
是否有一种方法可以动态地管理流到源的合并,以便满足以下条件:

publish(topic: String, messages: Source[T])
subscribe(topic: String): Source[T]

这样,无论何时添加发布者,主题的订阅者都将在进行订阅后获得发布到与该主题相关的任何源的所有消息。
也很高兴听到替代方法。
谢了,Z

des4xlb0

des4xlb01#

你可能想看看这个Akka文件:使用MergeHubBroadcastHub构建dynamic pub-sub service
以下是分别使用MergeHubBroadcastHub作为动态扇入和扇出结点的示例代码。
其思想是将MergeHubBroadcastHub连接起来,以Flow via Flow.fromSinkAndSource的形式形成一个pub-sub通道:

val (bfSink, bfSource) = MergeHub.source[String](perProducerBufferSize).
  toMat(BroadcastHub.sink[String](bufferSize))(Keep.both).
  run

val busFlow: Flow[String, String, NotUsed] = Flow.fromSinkAndSource(bfSink, bfSource)

请注意,上述代码片段中的Keep.bothMergeHub.source[T]BroadcastHub.sink[T]生成了一个物化值(Sink[T, NotUsed], Source[T, NotUsed])的元组,它们具有以下方法签名:

object MergeHub {
  def source[T](perProducerBufferSize: Int): Source[T, Sink[T, NotUsed]] = // ...
  // ...
}

object BroadcastHub {
  def sink[T](bufferSize: Int): Sink[T, Source[T, NotUsed]] = // ...
  // ...
}

下面是一个简单的发布-订阅通道busFlow的示例代码(类似于Akka文档中的示例):

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import akka.NotUsed

implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

val (bfSink, bfSource) = MergeHub.source[String](perProducerBufferSize = 32).
  toMat(BroadcastHub.sink[String](bufferSize = 256))(Keep.both).
  run

// Optional: avoid building up backpressure when there is no subscribers
bfSource.runWith(Sink.ignore)

val busFlow: Flow[String, String, NotUsed] = Flow.fromSinkAndSource(bfSink, bfSource)

测试busFlow

Source(101 to 103).map(i => s"Batch(A)-$i").
  delay(2.seconds, DelayOverflowStrategy.backpressure).
  viaMat(busFlow)(Keep.right).
  to(Sink.foreach{ case s: String => println("Consumer(1)-" + s) }).
  run

Source(104 to 105).map(i => s"Batch(B)-$i").
  viaMat(busFlow)(Keep.right).
  to(Sink.foreach{ case s: String => println("Consumer(2)-" + s) }).
  run

// Consumer(2)-Batch(B)-104
// Consumer(2)-Batch(B)-105
// Consumer(1)-Batch(B)-104
// Consumer(1)-Batch(B)-105
// Consumer(1)-Batch(A)-101
// Consumer(1)-Batch(A)-102
// Consumer(2)-Batch(A)-101
// Consumer(1)-Batch(A)-103
// Consumer(2)-Batch(A)-102
// Consumer(2)-Batch(A)-103

busFlow作为pub-sub通道,其输入通过bfSink发布到所有订阅者,而其输出则通过bfSource传输所有发布的元素。例如:

val p1 = Source.tick[Int](0.seconds, 5.seconds, 5).map(_.toString)
p1.runWith(bfSink)

val p2 = Source.tick[Int](2.seconds, 10.seconds, 10).map(_.toString)
p2.runWith(bfSink)

val s1 = bfSource
s1.runForeach(x => println(s"s1 --> $x"))

val s2 = bfSource
s2.runForeach(x => println(s"s2 --> $x"))

// s1 --> 5
// s2 --> 5
// s1 --> 10
// s2 --> 10
// s2 --> 5
// s1 --> 5
// s2 --> 5
// s1 --> 5
// s1 --> 10
// s2 --> 10
// s2 --> 5
// s1 --> 5
// ...

其他可能感兴趣的相关主题包括用于流完成控制的KillSwitch和用于将Stream元素从给定的生成器路由到动态的一组使用者的PartitionHub

vwhgwdsa

vwhgwdsa2#

以下是我最后做的事情。发布者和订阅者都可以出现和消失,无论订阅者何时加入,也无论发布者何时加入,订阅者都应该能够看到他们订阅的所有发布消息(按主题),而不管在订阅时哪些发布者是活动的。欢迎评论。

def main(args: Array[String]): Unit = {
   val actorSystem = ActorSystem("test")
   val materializerSettings = ActorMaterializerSettings(actorSystem)
   implicit val materializer = ActorMaterializer(materializerSettings)(actorSystem)
   implicit val ec: ExecutionContext = actorSystem.dispatcher

   val (queue, pub) = Source.queue[Int](100, akka.stream.OverflowStrategy.dropHead).toMat(Sink.asPublisher(true))(Keep.both).run()

   val p1 = Source.tick[Int](0.seconds, 5.seconds, 5)

   p1.runForeach(x=> {queue.offer(x)})

   val p2= Source.tick[Int](2.seconds,10.seconds, 10)
   p2.runForeach(x=> queue.offer(x))

   val s1 = Source.fromPublisher(pub)
   s1.runForeach(x=> println(s"s1 =======> ${x}"))

   val s2 = Source.fromPublisher(pub)
   s2.runForeach(x=> println(s"s2 =======> ${x}"))
}

相关问题