我现在正在玩akka streams,并尝试了以下示例。
当请求某个http端点时,从kafka获取第一个元素。这是我写的代码和它的工作。
get {
path("ticket" / IntNumber) { ticketNr =>
val future = Consumer.plainSource(consumerSettings, Subscriptions.topics("tickets"))
.take(1)
.completionTimeout(5 seconds)
.runWith(Sink.head)
onComplete(future) {
case Success(record) => complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, record.value()))
case _ => complete(HttpResponse(StatusCodes.NotFound))
}
}
}
我只是想知道,这是否是一种意识形态的方式来处理(阿克卡)流。那么,有没有更“直接”的方式将kafka流连接到http响应流呢?
例如,发布时我会这样做:
val kafkaTicketsSink = Flow[String]
.map(new ProducerRecord[Array[Byte], String]("tickets", _))
.toMat(Producer.plainSink(producerSettings))(Keep.right)
post {
path("ticket") {
(entity(as[Ticket]) & extractMaterializer) { (ticket, mat) => {
val f = Source.single(ticket).map(t => t.description).runWith(kafkaTicketsSink)(mat)
onComplete(f) { _ =>
val locationHeader = headers.Location(s"/ticket/${ticket.id}")
complete(HttpResponse(StatusCodes.Created, headers = List(locationHeader)))
}
}
}
}
}
也许这也可以改进??
1条答案
按热度按时间2exbekwf1#
你可以用它来维持一个单一的背压流
Sink.queue
. 每次收到请求时,都可以从物化队列中提取元素。这应该给你回一个元素,如果可用,和反压力,否则。大致如下:
更多信息
Sink.queue
可以在文档中找到。