使用akka stream和akka HTTP,我创建了一个每3秒轮询一次API的流,将结果解组到JsValue对象并将结果发送给actor。在下面的代码中可以看到:
// Source which performs an http request every 3 seconds.
val source = Source.tick(0.seconds,
3.seconds,
HttpRequest(uri = Uri(path = Path("/posts/1"))))
// Processes the result of the http request
val flow = Http().outgoingConnectionHttps("jsonplaceholder.typicode.com").mapAsync(1) {
// Able to reach the API.
case HttpResponse(StatusCodes.OK, _, entity, _) =>
// Unmarshal the json response.
Unmarshal(entity).to[JsValue]
// Failed to reach the API.
case HttpResponse(code, _, entity, _) =>
entity.discardBytes()
Future.successful(code.toString())
}
// Run stream
source.via(flow).runWith(Sink.actorRef[Any](processJsonActor,akka.actor.Status.Success(("Completed stream"))))
这是有效的,但是流在100个HttpRequests(滴答声)后关闭。
这种行为的原因是什么?
2条答案
按热度按时间qvtsj1bj1#
肯定是和
outgoingConnectionHttps
有关的。这是一个低级别的DSL,可能有一些错误配置的地方,这是导致这一点的(虽然我不能找出是哪一个)。文档实际上不鼓励使用这种DSL。
尝试使用更高级别的DSL,如缓存连接池
dw1jzc5e2#
一个潜在的问题是,
Sink.actorRef
没有背压信号,因此演员的邮箱可能会被填满。如果操作者无论何时接收到JsValue
对象,都在做一些可能需要很长时间的事情,那么使用Sink.actorRefWithAck
。例如:您需要更改actor来处理
initMessage
,并使用ackMessage
(使用sender ! ackMessage
)回复每个流元素的流。关于Sink.actorRefWithAck
的更多信息可以在这里找到。