从lagom/akka kafka主题订阅服务器为websocket创建源代码

y0u0uwnf  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(408)

我希望我的lagom订户专用服务订阅kafka主题并将消息流式传输到websocket。我有一个服务定义如下使用此文档(https://www.lagomframework.com/documentation/1.4.x/scala/messagebrokerapi.html#subscribe-作为指导方针:

// service call
    def stream(): ServiceCall[Source[String, NotUsed], Source[String, NotUsed]]

    // service implementation
    override def stream() = ServiceCall { req =>
      req.runForeach(str => log.info(s"client: %str"))
      kafkaTopic().subscribe.atLeastOnce(Flow.fromFunction(
        // add message to a Source and return Done
      ))
      Future.successful(//some Source[String, NotUsed])

然而,我不太明白如何处理我的Kafka信息。这个 Flow.fromFunction 退货 [String, Done, _] 意味着我需要将这些消息(字符串)添加到在订阅服务器外部创建的源中。
所以我的问题有两个:1)如何创建一个akka流源来在运行时接收来自kafka主题订阅者的消息?2) 在流中,如何将kafka消息附加到所述源中?

nafvub8i

nafvub8i1#

你好像误解了lagom的服务api。如果您试图具体化服务调用主体中的流,那么您的调用没有任何输入;即。,

def stream(): ServiceCall[Source[String, NotUsed], Source[String, NotUsed]]

意味着当客户机提供 Source[String, NotUsed] ,服务将以实物形式响应。你的客户不是直接提供的;因此,你的签名应该是

def stream(): ServiceCall[NotUsed, Source[String, NotUsed]]

现在回答你的问题。。。
这实际上并不存在于scalagiter8模板中,但是java版本包含了一个他们称之为自治流的东西,它大致完成了您想要做的事情。
在scala中,这段代码看起来像。。。

override def autonomousStream(): ServiceCall[
  Source[String, NotUsed], 
  Source[String, NotUsed]
] = ServiceCall { hellos => Future {
    hellos.mapAsync(8, ...)
  }
}

由于您的调用不是Map到输入流,而是一个kafka主题,因此您需要执行以下操作:

override def stream(): ServiceCall[NotUsed, Source[String, NotUsed]] = ServiceCall { 
  _ => 
    Future {
      kafkaTopic()
        .subscribe
        .atMostOnce
        .mapAsync(...)
    }
}

相关问题