我有一个相当简单的应用程序,由一个kafka消费者坐在akka http流服务器后面组成。收到请求后,服务器将为指定用户启动一个新的使用者,并开始从队列中读取消息:
def consumer(consumerGroup: String, from: Int) = {
val topicsAndDate = Subscriptions.assignmentOffsetsForTimes(partitions.map(_ -> (System.currentTimeMillis() - from)): _*)
Consumer.plainSource[String, GenericRecord](consumerSettings.withGroupId(consumerGroup), topicsAndDate)
.map(record => record.timestamp() -> messageFormat.from(record.value()))
.map {
//convert to json
}
}
def routes: Route = Route.seal(
pathSingleSlash {
complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "Say hello to akka-http"))
} ~
path("stream") {
//some logic to validate user
log.info("Received request from {} with 'from'={}", user, from)
complete(consumer(user, from))
})
startServer("0.0.0.0", 8080)
在使用者到达队列上的最新消息之前,服务工作正常。在返回此最新消息60秒后,每次都会终止与服务器的连接。我想让连接保持活动状态,因为队列每隔几分钟就会填充更多的消息。
我尝试过各种不同的配置选项,但似乎没有一个能给出预期的结果。我当前的配置如下所示:
akka {
http {
client {
idle-timeout = 300s
}
server {
idle-timeout = 600s
linger-timeout = 15 min
}
host-connection-pool {
max-retries = 30
max-connections = 20
max-open-requests = 32
connecting-timeout = 60s
client {
idle-timeout = 300s
}
}
}
}
我也试过使用 server.websocket.periodic-keep-alive-max-idle = 1 second
设置,但似乎没有任何区别。
如果我需要提供更多的相关信息,请告诉我。
暂无答案!
目前还没有任何答案,快来回答吧!