akka http流服务器在来自队列的最后一条消息之后终止连接

qco9c6ql  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(215)

我有一个相当简单的应用程序,由一个kafka消费者坐在akka http流服务器后面组成。收到请求后,服务器将为指定用户启动一个新的使用者,并开始从队列中读取消息:

def consumer(consumerGroup: String, from: Int) = {
  val topicsAndDate = Subscriptions.assignmentOffsetsForTimes(partitions.map(_ -> (System.currentTimeMillis() - from)): _*)

  Consumer.plainSource[String, GenericRecord](consumerSettings.withGroupId(consumerGroup), topicsAndDate)
    .map(record => record.timestamp() -> messageFormat.from(record.value()))
    .map {
      //convert to json
    }
}

def routes: Route = Route.seal(
  pathSingleSlash {
    complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "Say hello to akka-http"))
  } ~
  path("stream") {
    //some logic to validate user

    log.info("Received request from {} with 'from'={}", user, from)
    complete(consumer(user, from))  
  })

startServer("0.0.0.0", 8080)

在使用者到达队列上的最新消息之前,服务工作正常。在返回此最新消息60秒后,每次都会终止与服务器的连接。我想让连接保持活动状态,因为队列每隔几分钟就会填充更多的消息。
我尝试过各种不同的配置选项,但似乎没有一个能给出预期的结果。我当前的配置如下所示:

akka {
  http {
    client {
      idle-timeout = 300s
    }
    server {
      idle-timeout = 600s
      linger-timeout = 15 min
    }
    host-connection-pool {
      max-retries = 30
      max-connections = 20
      max-open-requests = 32
      connecting-timeout = 60s
      client {
        idle-timeout = 300s
      }
    }
  }
}

我也试过使用 server.websocket.periodic-keep-alive-max-idle = 1 second 设置,但似乎没有任何区别。
如果我需要提供更多的相关信息,请告诉我。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题