akka流kafka web套接字客户端在30秒的非活动期后停止接收消息

4xy9mtcn  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(238)

我想阅读Kafka主题的消息并发送到web套接字客户端。我使用akkastreamkafka创建了一个示例web套接字服务器应用程序。
我使用Kafka控制台生产者脚本发送消息到Kafka主题。工作正常,浏览器客户端通过web套接字接收来自kafka主题的数据。
如果我在30秒内没有向主题发送消息,服务器会给出以下消息,
[kafkaserviceactor akka.actor.default-dispatcher-2][akka://kafkaserviceactor/system/kafka-consumer-2]消息[akka.kafka.kafkaconsumerator$internal$stop$]未将发件人发送给参与者[akka://kafkaserviceactor/system/kafka-consumer-2#-1039823616]未交付[1] 遇到死信。可以使用配置设置“akka.log死信”和“akka.log关机时死信”关闭或调整此日志记录。
在这个消息之后,websocket客户端不会从kafka主题获得任何新消息。我的客户需要重新连接才能开始接收来自主题的消息。
感谢您的反馈。

val incoming = Sink.foreach(println)

def kafkaOut(clientId: String) =
  Consumer.plainSource(consumerSettings.withGroupId(clientId), Subscriptions.topics(KAFKA_TOPIC)).map {
    r =>
      val value = r.value()
      TextMessage(value)
  }

val requestHandler: HttpRequest => HttpResponse = {
  case req @ HttpRequest(GET, Uri.Path("/stream"), _, _, _) =>
    req.header[UpgradeToWebSocket] match {
      case Some(upgrade) => {
        val clientId = "akka-kafka-client-" + clientIdGen.nextInt(1000) 
        upgrade.handleMessagesWithSinkSource(incoming, kafkaOut(clientId))
      }
      case None => HttpResponse(400, entity = "Not a valid websocket request")
    }

  case req: HttpRequest =>
    req.discardEntityBytes() //drain the incoming http entity stream
    HttpResponse(400, entity = "Unknown service")
}

private def startServer(): Unit = {
  val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
    Http().bind(interface = HOST, PORT)

  bindingFuture =
    serverSource.to(Sink.foreach { connection =>
      connection.handleWithSyncHandler(requestHandler)
    }).run()

  log.info(s"Server online at http://localhost:9000/")
}
//I do not get any error with following changes 
def kafkaOut(clientId: String) = {

val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail))

futureQueue.map { queue =>
  Consumer.plainSource(consumerSettings.withGroupId(clientId), Subscriptions.topics(KAFKA_TOPIC))
    .runForeach(t => queue.offer(t.value()))
}

queueSource.map(x => TextMessage(x))
}                                                                                                                
//T is the source type, here String
//M is the materialization type, here a SourceQueue[String]
def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = {
val p = Promise[M]
val s = src.mapMaterializedValue { m =>
  p.trySuccess(m)
  m
}
(s, p.future)
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题