我们有一个服务使用Alpakka 3.0.4(Scala 2.13)从S3流传输多个大文件,压缩它们,并将压缩后的流作为HTTP响应发送出去,其思想是在压缩完成之前就开始发送,在文件到达之前就开始压缩,等等--所有这些都在背压方面进行了精心管理,这正是Akka Streams擅长的。
顺便说一下,HTTP服务器是Aleph,因为代码库实际上是Clojure,我们从Clojure调用Scala(这根本不是问题--事实上,比从Clojure调用Java容易得多)。Aleph支持以块为单位流式传输HTTP响应,这实际上是它的默认模式。
如果HTTP响应被完全消耗,一切都正常。响应的主体是压缩的输入流。一旦客户端完全消耗了它,流就被关闭,Akka会关闭所有其他流,包括原始的S3输入流。
但偶尔,客户端会在压缩流完全消耗完之前过早关闭连接(仍在调查原因),最终用户只会得到一个空的zip文件。我们在服务器端观察到S3流没有正确关闭,这导致连接泄漏,直到S3连接池耗尽。
处理这种不完全消费的正确方法是什么?
查看InputStreamSource
类的源代码时,我注意到以下几点:
override def postStop(): Unit = {
if (!isClosed) {
mat.tryFailure(new AbruptStageTerminationException(this))
}
}
private def failStream(reason: Throwable): Unit = {
closeInputStream()
mat.tryFailure(new IOOperationIncompleteException(readBytesTotal, reason))
}
当failStream
中的流被关闭时,有什么特殊的原因不关闭postStop
中的流吗?如果我们在一个图上调用runWith
,但得到的输入流没有被完全使用,它将在什么时候被调用?或者我应该查看其他内容吗?
更新,回应评论:
这个图非常简单,原始源代码由一系列元组(filename和Source
,由输入流构造)构成,这些元组流经Archive.zip
并进入输入流接收器。Clojure代码大致如下:
(let [tuples (reify Iterable
(iterator [_]
;; Some wrapper code that ultimately calls this for each item:
(Tuple2/apply (ArchiveMetadata/create filename)
(StreamConverters/fromInputStream
(reify Function0 (apply [_] attachment))))))
source (Source/apply tuples)
graph (.via source (Archive/zip))
sink (StreamConverters/asInputStream
(FiniteDuration/apply ^long timeout TimeUnit/SECONDS))
mat (Materializer/matFromSystem system)]
(.runWith graph sink mat))))
Scala的等价物如下所示:
val tuples = [some-iterable].map {
case Something(filename, attachment) =>
(ArchiveMetadata.create(filename),
StreamConverters.fromInputStream(attachment)
}
val source = Source(tuples)
val graph = source.via(Archive.zip())
val sink = StreamConverters.asInputStream(timeout seconds)
// Asssuming an implicit materializer from an ActorSystem
graph.runWith(sink) // returns an InputStream
输入流(attachment
,见上文)通常是S3流,但也可以来自其他存储库。它们不是通过调用Akka获得的。对于S3,它们是通过调用amazonica
获得的,amazonica
是Java AWS API的一个瘦Clojure Package 器。
更新2:为了重现此问题,我们使用了--head
的curl
请求。如果使用wget
而不是curl
,则会完全下载压缩文件,并且不会发现此问题(所有的流都被关闭)。如果我们通过请求头部而停止,压缩流(.runWith
返回的流)被关闭,但原始S3流(上面代码中的attachment
)未被关闭。
如果在执行.runWith
之前放置了断点,那么它也是可重现的。因此,可能涉及到争用条件。在服务器端不会抛出异常。
1条答案
按热度按时间6ss1mwsb1#
因此,Akka接收器没有正确地保护压缩的输入流,以免在源文件完全压缩之前耗尽。如果在结果
InputStream
上调用read
,并且它还没有数据,则整个阶段关闭。输入流是
akka.stream.impl.io.InputStreamAdapter
的一个示例(它实现java.io.InputStream
),在read
的实现中包含以下代码:我的印象是Akka流既可以防止背压(当生产者比消费者快时),也可以防止早期耗尽(当消费者比生产者快时),但显然不是!
或者我使用了错误的接收器?我需要一个特殊的接收器来处理HTTP响应吗?不是
Sink[_, InputStream]
,而是Sink[_, Future[HttpResponse]]
或者Sink[_, manifold.deferred.IDeferred]
(因为我们使用Aleph而不是Akka Http?不过,如果使用了合适的接收器,我可以很容易地从Future
转换为IDeferred
。有什么建议吗?