kafka与tcp之间的akka流网关

wbrvyc0a  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(487)

我试图用akka流做一些图表,我遇到了一些困难的情况。基本上我需要在kafka服务器和tcp服务器之间路由消息。困难的部分发生了,因为我有时不得不回答tcp服务器。
案例1:tcp->kafka(确定)
案例2:Kafka->tcp(确定)
情况3:tcp->tcp
案例4:nothing->tcp
案例3发生在我收到一些消息,在发布给Kafka之前,我不得不向服务器询问一些细节
案例4通过发送第一次握手来打开tcp连接。
我想到了反馈循环,source.actoref,fan out,但仍然无法构建它。
这是我想到的主要图表:

+------------------+
                                            |                  |
                                            | TCP msg          |
                                            | (Source.actorRef)|
                                            |                  |
                                            +----+-------------+
                                                 |
                                                 |
+------------+       +-----------------+    +----v----+      +--------------+
|            +------->                 +---->         +------>Kafka (Sink)  |
|   TCP      |       |   TLS           |    | Router  |      +--------------+
|   (flow)   |       |   (bi-dir flow) |    | (???)   |      +--------------+
|            <-------+                 <----+         <------+Kafka (Source)|
+------------+       +-----------------+    +---------+      +--------------+

我想我需要用graphstage构建一些个性化的图,但是错过了一些参考。专门为 Router ,它必须能够接受三个输入并将答案转发到两个不同的输出中。
如果你有任何线索,我会喜欢的。
提前谢谢!

bybem2ql

bybem2ql1#

我终于明白了。。。这是我的一个代码示例,如果它能帮助

class Msg
  case class Msg1(string: String) extends Msg
  case class Msg2(string: String) extends Msg
  case class Msg3(string: String) extends Msg

def optionFilter[T]: Flow[Option[T], T, NotUsed] = {
  import akka.stream.scaladsl.GraphDSL.Implicits._

val graph = GraphDSL.create() { implicit builder =>
  {
    def partitioner(o: Option[T]) = o.map(_ => 0).getOrElse(1)
    val partition: UniformFanOutShape[Option[T], Option[T]] =
      builder.add(Partition[Option[T]](2, partitioner))

    val flow = builder.add(Merge[T](1))

    partition.out(0) ~> Flow[Option[T]].collect { case Some(t) ⇒ t } ~> flow
    partition.out(1) ~> Flow[Option[T]].collect { case None    ⇒ None } ~> Sink.ignore

    FlowShape(partition.in, flow.out)
    }
  }
  Flow.fromGraph(graph)
  }

val msg1Filter = optionFilter[Msg1]

def router(
  source1: Source[Msg, ActorRef],
  source2: Source[Msg, _],
  sink1: Sink[Msg1, _],
  sink2: Sink[Msg2, _],
  flow: Flow[Msg3, Msg, _]
  ): ActorRef = {
  import akka.stream.scaladsl.GraphDSL.Implicits._

  val graph: Graph[SinkShape[Msg], NotUsed] = GraphDSL.create() { implicit builder =>
    val unzipper: UnzipWith3[Msg, Option[Msg1], Option[Msg2], Option[Msg3]] =
    UnzipWith { msg: Msg =>
      msg match {
        case msg1: Msg1 => (Some(msg1), None, None)
        case msg2: Msg2 => (None, Some(msg2), None)
        case msg3: Msg3 => (None, None, Some(msg3))
      }
    }

  val merge   = builder.add(Merge[Msg](3))
  val forward = builder.add(Merge[Msg](1))

  val unzip = builder.add(unzipper)

  source2 ~> merge ~> unzip.in
  forward.out ~> merge

  unzip.out0 ~> optionFilter[Msg1] ~> sink1
  unzip.out1 ~> optionFilter[Msg2] ~> sink2
  unzip.out2 ~> optionFilter[Msg3] ~> flow ~> merge

  SinkShape(forward.in(0))
}

val sink: Sink[Msg, NotUsed] = Sink.fromGraph(graph)

sink.runWith(source1)
}

val source1 = Source.actorRef(4096, OverflowStrategy.fail)
val source2 = Source(List(Msg2("from source2")))

val sink1: Sink[Msg1, Future[Done]] = Sink.foreach((msg: Msg1) => println(s"sink1: $msg"))
val sink2: Sink[Msg2, Future[Done]] = Sink.foreach((msg: Msg2) => println(s"sink2: $msg"))

val flow = Flow.fromFunction((msg: Msg3) => {
  val msg2 = Msg2("from the flow")
  println(s"flow: forward msg $msg to $msg2")
  msg2
})

val actor = router(source1, source2, sink1, sink2, flow)

actor ! Msg1("from source1 (actor)")
actor ! Msg2("from source1 (actor)")
actor ! Msg3("from source1 (actor)")

这将为您提供以下输出

sink2: Msg2(from source2)
sink1: Msg1(from source1 (actor))
sink2: Msg2(from source1 (actor))
flow: forward msg Msg3(from source1 (actor)) to Msg2(from the flow)
sink2: Msg2(from the flow)

希望对将来的人有帮助!

相关问题