我正在尝试使用现代Swift并发性构建一个分块文件上传机制。有一个流文件阅读器,我用它来读取1 mb大小的文件。它有两个闭包nextChunk: (DataChunk) -> Void
和completion: () - Void
。第一个被调用的次数与从块大小的InputStream
读取的数据一样多。
为了使这个阅读器符合Swift并发性,我做了扩展并创建了AsyncStream
,它似乎是最适合这种情况的。
public extension StreamedFileReader {
func read() -> AsyncStream<DataChunk> {
AsyncStream { continuation in
self.read(nextChunk: { chunk in
continuation.yield(chunk)
}, completion: {
continuation.finish()
})
}
}
}
使用这个AsyncStream
,我迭代地读取一些文件,并像这样进行网络调用:
func process(_ url: URL) async {
// ...
do {
for await chunk in reader.read() {
let request = // ...
_ = try await service.upload(data: chunk.data, request: request)
}
} catch let error {
reader.cancelReading()
print(error)
}
}
这里的问题是,我不知道有任何限制机制不允许执行超过N个网络调用。因此,当我试图上传巨大的文件(5GB)内存消耗急剧增长。正因为如此,流式阅读文件的想法没有意义,因为它更容易将整个文件读入内存(这是一个笑话,但看起来像)。
相比之下,如果我使用一个好的旧GCD,一切都像一个魅力:
func process(_ url: URL) {
let semaphore = DispatchSemaphore(value: 5) // limit to no more than 5 requests at a given time
let uploadGroup = DispatchGroup()
let uploadQueue = DispatchQueue.global(qos: .userInitiated)
uploadQueue.async(group: uploadGroup) {
// ...
reader.read(nextChunk: { chunk in
let requset = // ...
uploadGroup.enter()
semaphore.wait()
service.upload(chunk: chunk, request: requset) {
uploadGroup.leave()
semaphore.signal()
}
}, completion: { _ in
print("read completed")
})
}
}
它的行为与AsyncStream
顺序运行时使用并发DispatchQueue
的行为不完全相同。所以我做了一点研究,发现可能TaskGroup
是我在这种情况下需要的。它允许并行运行异步任务等。
我试着这样做:
func process(_ url: URL) async {
// ...
do {
let totalParts = try await withThrowingTaskGroup(of: Void.self) { [service] group -> Int in
var counter = 1
for await chunk in reader.read() {
let request = // ...
group.addTask {
_ = try await service.upload(data: chunk.data, request: request)
}
counter = chunk.index
}
return counter
}
} catch let error {
reader.cancelReading()
print(error)
}
}
在这种情况下,内存消耗甚至比AsyncStream
迭代的例子更多!
我怀疑应该有一些条件,我需要挂起组或任务或其他东西,只有当有可能真正处理我要添加的这些任务时才调用group.addTask
,但我不知道如何做到这一点。
我发现了这个Q/A,并试图把try await group.next()
的每5块,但它没有帮助我在所有。
有没有类似于DispatchGroup
+ DispatchSemaphore
的机制,但用于现代并发?
**更新:**为了更好地展示所有3种方式之间的差异,这里是内存报告的屏幕截图
2条答案
按热度按时间x4shl7ld1#
关键问题是
AsyncStream
的使用。您的AsyncStream
阅读数据并生成块的速度比上传的速度快。考虑这个MCVE,我模拟了一个100个块的流,每个块1 mb:
然后呢
当我这样做的时候,我看到内存峰值达到150 MB,因为
AsyncStream
快速地产生了 * 所有 * 块:请注意,所有
Ⓢ
路标(显示Data
对象创建时间)在过程开始时都聚集在一起。注意,the documentation警告我们,序列生成值的速度可能比它们被消耗的速度更快:
元素的任意源生成元素的速度比调用者迭代时消耗的速度快。因此,
AsyncStream
定义了缓冲行为,允许流缓冲特定数量的最旧或最新元素。默认情况下,缓冲区限制为Int.max
,这意味着该值是无界的。不幸的是,
.bufferingOldest
和.bufferingNewest
的各种缓冲选择只会在缓冲区被填满时丢弃值。在一些AsyncStreams
中,这可能是一个可行的解决方案(例如,如果你跟踪用户位置,你可能只关心最近的位置),但是当上传文件块时,你显然不能让它在缓冲区耗尽时丢弃块。因此,与
AsyncStream
相比,只需使用自定义AsyncSequence
来 Package 文件阅读,直到实际需要时才会读取下一个块,从而显著减少峰值内存使用,例如:然后呢
这避免了一次加载内存中的所有100 MB。请注意,内存的垂直比例是不同的,但您可以看到峰值使用比上图少了100 MB,并且
Ⓢ
路标(显示数据何时被读入内存)现在分布在整个图中,而不是在开始时全部分布:现在,显然,我只是在模拟使用
Chunk
/Data
对象阅读一个大文件,并模拟使用Task.sleep
上传,但它希望说明了基本思想。总之,不要使用
AsyncStream
来读取文件,而是考虑使用自定义AsyncSequence
或其他模式来读取文件,因为需要块。其他一些观察:
Data
。基于文件的上传内存效率要高得多。无论资产的大小如何,在基于文件的资产期间使用的内存都将以kb为单位。您甚至可以完全关闭分块功能,无论文件大小如何,基于文件的上传都将使用很少的内存。URLSession
文件上传具有最小的内存占用。这也是我们进行基于文件上传的原因之一。xfb7svmp2#
我希望能够将异步任务放入一个队列中,类似于NSOperationQueue。我想限制并发操作的最大数量,并且设置优先级,以便高优先级任务在低优先级任务之前从队列中取出。
WWDC实验室的一位苹果工程师指出,你可以使用withCheckedContinuation来挂起任务。这提供了一个延续,您可以在以后调用该延续以重新启动任务。
这是我的跑步者的钥匙。
使用以下命令创建流道
然后添加任务
跑步者如下…