我想阅读Kafka主题的消息并发送到web套接字客户端。我使用akkastreamkafka创建了一个示例web套接字服务器应用程序。
我使用Kafka控制台生产者脚本发送消息到Kafka主题。工作正常,浏览器客户端通过web套接字接收来自kafka主题的数据。
如果我在30秒内没有向主题发送消息,服务器会给出以下消息,
[kafkaserviceactor akka.actor.default-dispatcher-2][akka://kafkaserviceactor/system/kafka-consumer-2]消息[akka.kafka.kafkaconsumerator$internal$stop$]未将发件人发送给参与者[akka://kafkaserviceactor/system/kafka-consumer-2#-1039823616]未交付[1] 遇到死信。可以使用配置设置“akka.log死信”和“akka.log关机时死信”关闭或调整此日志记录。
在这个消息之后,websocket客户端不会从kafka主题获得任何新消息。我的客户需要重新连接才能开始接收来自主题的消息。
感谢您的反馈。
val incoming = Sink.foreach(println)
def kafkaOut(clientId: String) =
Consumer.plainSource(consumerSettings.withGroupId(clientId), Subscriptions.topics(KAFKA_TOPIC)).map {
r =>
val value = r.value()
TextMessage(value)
}
val requestHandler: HttpRequest => HttpResponse = {
case req @ HttpRequest(GET, Uri.Path("/stream"), _, _, _) =>
req.header[UpgradeToWebSocket] match {
case Some(upgrade) => {
val clientId = "akka-kafka-client-" + clientIdGen.nextInt(1000)
upgrade.handleMessagesWithSinkSource(incoming, kafkaOut(clientId))
}
case None => HttpResponse(400, entity = "Not a valid websocket request")
}
case req: HttpRequest =>
req.discardEntityBytes() //drain the incoming http entity stream
HttpResponse(400, entity = "Unknown service")
}
private def startServer(): Unit = {
val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
Http().bind(interface = HOST, PORT)
bindingFuture =
serverSource.to(Sink.foreach { connection =>
connection.handleWithSyncHandler(requestHandler)
}).run()
log.info(s"Server online at http://localhost:9000/")
}
//I do not get any error with following changes
def kafkaOut(clientId: String) = {
val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail))
futureQueue.map { queue =>
Consumer.plainSource(consumerSettings.withGroupId(clientId), Subscriptions.topics(KAFKA_TOPIC))
.runForeach(t => queue.offer(t.value()))
}
queueSource.map(x => TextMessage(x))
}
//T is the source type, here String
//M is the materialization type, here a SourceQueue[String]
def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = {
val p = Promise[M]
val s = src.mapMaterializedValue { m =>
p.trySuccess(m)
m
}
(s, p.future)
}
暂无答案!
目前还没有任何答案,快来回答吧!