akka 为什么Source.tick在100个HttpRequests后停止?

lf3rwulv  于 2023-05-22  发布在  其他
关注(0)|答案(2)|浏览(195)

使用akka stream和akka HTTP,我创建了一个每3秒轮询一次API的流,将结果解组到JsValue对象并将结果发送给actor。在下面的代码中可以看到:

// Source which performs an http request every 3 seconds.
val source = Source.tick(0.seconds,
                         3.seconds,
                         HttpRequest(uri = Uri(path = Path("/posts/1"))))

// Processes the result of the http request
val flow = Http().outgoingConnectionHttps("jsonplaceholder.typicode.com").mapAsync(1) {
  
  // Able to reach the API.
  case HttpResponse(StatusCodes.OK, _, entity, _) =>
    // Unmarshal the json response.
    Unmarshal(entity).to[JsValue]

  // Failed to reach the API.
  case HttpResponse(code, _, entity, _) =>
    entity.discardBytes()
    Future.successful(code.toString())

}

// Run stream
source.via(flow).runWith(Sink.actorRef[Any](processJsonActor,akka.actor.Status.Success(("Completed stream"))))

这是有效的,但是流在100个HttpRequests(滴答声)后关闭。
这种行为的原因是什么?

qvtsj1bj

qvtsj1bj1#

肯定是和outgoingConnectionHttps有关的。这是一个低级别的DSL,可能有一些错误配置的地方,这是导致这一点的(虽然我不能找出是哪一个)。
文档实际上不鼓励使用这种DSL。
尝试使用更高级别的DSL,如缓存连接池

val flow = Http().cachedHostConnectionPoolHttps[NotUsed]("akka.io").mapAsync(1) {

    // Able to reach the API.
    case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), _) =>
      // Unmarshal the json response.
      Unmarshal(entity).to[String]

    // Failed to reach the API.
    case (Success(HttpResponse(code, _, entity, _)), _) =>
      entity.discardBytes()
      Future.successful(code.toString())

    case (Failure(e), _) ⇒
      throw e
  }

  // Run stream
  source.map(_ → NotUsed).via(flow).runWith(...)
dw1jzc5e

dw1jzc5e2#

一个潜在的问题是,Sink.actorRef没有背压信号,因此演员的邮箱可能会被填满。如果操作者无论何时接收到JsValue对象,都在做一些可能需要很长时间的事情,那么使用Sink.actorRefWithAck。例如:

val initMessage = "start"
val completeMessage = "done"
val ackMessage = "ack"

source
  .via(flow)
  .runWith(Sink.actorRefWithAck[Any](
    processJsonActor, initMessage, ackMessage, completeMessage))

您需要更改actor来处理initMessage,并使用ackMessage(使用sender ! ackMessage)回复每个流元素的流。关于Sink.actorRefWithAck的更多信息可以在这里找到。

相关问题