使用akka流从S3使用文件时出现TimeoutException

yh2wf1be  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(173)

我正尝试使用akka流以流的方式从S3消耗一堆文件:

S3.listBucket("<bucket>", Some("<common_prefix>"))
  .flatMapConcat { r => S3.download("<bucket>", r.key) }
  .mapConcat(_.toList)
  .flatMapConcat(_._1)
  .via(Compression.gunzip())
  .via(Framing.delimiter(ByteString("\n"), Int.MaxValue))
  .map(_.utf8String)
  .runForeach { x => println(x) }

不增加akka.http.host-connection-pool.response-entity-subscription-timeout,我得到
java.util.concurrent.TimeoutException: Response entity was not subscribed after 1 second. Make sure to read the response entity body or call discardBytes() on it.(对于 second 文件),在打印完第一个文件的最后一行之后,尝试访问第二个文件的第一行时。
我理解这个异常的本质。我不明白为什么第二个文件的请求已经在进行中,而第一个文件仍在处理中。我猜,这涉及到了一些缓冲。
有什么想法,如何摆脱这个异常 * 而不必 * 增加akka.http.host-connection-pool.response-entity-subscription-timeout

nqwrtyyt

nqwrtyyt1#

与其使用flatMapConcat将下载文件的处理合并到一个流中,不如尝试在外部流中具体化该流,并在向下游发送输出之前在那里完全处理它。然后,在准备好之前,不应该开始下载(并完全处理)下一个对象。
一般来说,您希望避免使用过多的流实体化来减少开销,但我认为这对于像这样执行网络I/O的应用程序来说是可以忽略的。
让我知道如果这样的工作:(警告:未测试)

S3.listBucket("<bucket>", Some("<common_prefix>"))
  .mapAsync(1) { result =>
    val contents = S3.download("<bucket>", r.key)
      .via(Compression.gunzip())
      .via(Framing.delimiter(ByteString("\n"), Int.MaxValue))
      .map(_.utf8String)
      .to(Sink.seq)(Keep.right)
      .run()
    contents     
  }
  .mapConcat(identity)
  .runForeach { x => println(x) }

相关问题