swift TaskGroup限制大量任务的内存使用量

eufgjt7s  于 2023-06-28  发布在  Swift
关注(0)|答案(2)|浏览(202)

我正在尝试使用现代Swift并发性构建一个分块文件上传机制。有一个流文件阅读器,我用它来读取1 mb大小的文件。它有两个闭包nextChunk: (DataChunk) -> Voidcompletion: () - Void。第一个被调用的次数与从块大小的InputStream读取的数据一样多。
为了使这个阅读器符合Swift并发性,我做了扩展并创建了AsyncStream,它似乎是最适合这种情况的。

public extension StreamedFileReader {
    func read() -> AsyncStream<DataChunk> {
        AsyncStream { continuation in
            self.read(nextChunk: { chunk in
                continuation.yield(chunk)
            }, completion: {
                continuation.finish()
            })
        }
    }
}

使用这个AsyncStream,我迭代地读取一些文件,并像这样进行网络调用:

func process(_ url: URL) async {
    // ...
    do {
        for await chunk in reader.read() {
            let request = // ...
            _ = try await service.upload(data: chunk.data, request: request)
        }
    } catch let error {
        reader.cancelReading()
        print(error)
    }
}

这里的问题是,我不知道有任何限制机制不允许执行超过N个网络调用。因此,当我试图上传巨大的文件(5GB)内存消耗急剧增长。正因为如此,流式阅读文件的想法没有意义,因为它更容易将整个文件读入内存(这是一个笑话,但看起来像)。
相比之下,如果我使用一个好的旧GCD,一切都像一个魅力:

func process(_ url: URL) {
    let semaphore = DispatchSemaphore(value: 5) // limit to no more than 5 requests at a given time
    let uploadGroup = DispatchGroup()
    let uploadQueue = DispatchQueue.global(qos: .userInitiated)
    uploadQueue.async(group: uploadGroup) {
        // ...
        reader.read(nextChunk: { chunk in
            let requset = // ...
            uploadGroup.enter()
            semaphore.wait()
            service.upload(chunk: chunk, request: requset) {
                uploadGroup.leave()
                semaphore.signal()
            }
        }, completion: { _ in
            print("read completed")
        })
    }    
}

它的行为与AsyncStream顺序运行时使用并发DispatchQueue的行为不完全相同。所以我做了一点研究,发现可能TaskGroup是我在这种情况下需要的。它允许并行运行异步任务等。
我试着这样做:

func process(_ url: URL) async {
    // ...
    do {
        let totalParts = try await withThrowingTaskGroup(of: Void.self) { [service] group -> Int in
            var counter = 1
            for await chunk in reader.read() {
                let request = // ...
                group.addTask {
                    _ = try await service.upload(data: chunk.data, request: request)
                }
                counter = chunk.index
            }
            return counter
        }
    } catch let error {
        reader.cancelReading()
        print(error)
    }
}

在这种情况下,内存消耗甚至比AsyncStream迭代的例子更多!
我怀疑应该有一些条件,我需要挂起组或任务或其他东西,只有当有可能真正处理我要添加的这些任务时才调用group.addTask,但我不知道如何做到这一点。
我发现了这个Q/A,并试图把try await group.next()的每5块,但它没有帮助我在所有。
有没有类似于DispatchGroup + DispatchSemaphore的机制,但用于现代并发?

**更新:**为了更好地展示所有3种方式之间的差异,这里是内存报告的屏幕截图

AsyncStream迭代

AsyncStream + TaskGroup(每5个chunk使用try await group.next()

GCD DispatchQueue + DispatchGroup + DispatchSemaphore

x4shl7ld

x4shl7ld1#

关键问题是AsyncStream的使用。您的AsyncStream阅读数据并生成块的速度比上传的速度快。
考虑这个MCVE,我模拟了一个100个块的流,每个块1 mb:

import os.log

private let log = OSLog(subsystem: "Test", category: .pointsOfInterest)

struct Chunk {
    let index: Int
    let data: Data
}

actor FileMock {
    let maxChunks = 100
    let chunkSize = 1_000_000
    var index = 0

    func nextChunk() -> Chunk? {
        guard index < maxChunks else { print("done"); return nil }
        defer { index += 1 }
        return Chunk(index: index, data: Data(repeating: UInt8(index & 0xff), count: chunkSize))
    }

    func chunks() -> AsyncStream<Chunk> {
        AsyncStream { continuation in
            index = 0
            while let chunk = nextChunk() {
                os_signpost(.event, log: log, name: "chunk")
                continuation.yield(chunk)
            }

            continuation.finish()
        }
    }
}

然后呢

func uploadAll() async throws {
    try await withThrowingTaskGroup(of: Void.self) { group in
        let chunks = await FileMock().chunks()
        var index = 0
        for await chunk in chunks {
            index += 1
            if index > 5 {
                try await group.next()
            }
            group.addTask { [self] in
                try await upload(chunk)
            }
        }
        try await group.waitForAll()
    }
}

func upload(_ chunk: Chunk) async throws {
    let id = OSSignpostID(log: log)
    os_signpost(.begin, log: log, name: #function, signpostID: id, "%d start", chunk.index)
    try await Task.sleep(nanoseconds: 1 * NSEC_PER_SEC)
    os_signpost(.end, log: log, name: #function, signpostID: id, "end")
}

当我这样做的时候,我看到内存峰值达到150 MB,因为AsyncStream快速地产生了 * 所有 * 块:

请注意,所有路标(显示Data对象创建时间)在过程开始时都聚集在一起。
注意,the documentation警告我们,序列生成值的速度可能比它们被消耗的速度更快:
元素的任意源生成元素的速度比调用者迭代时消耗的速度快。因此,AsyncStream定义了缓冲行为,允许流缓冲特定数量的最旧或最新元素。默认情况下,缓冲区限制为Int.max,这意味着该值是无界的。
不幸的是,.bufferingOldest.bufferingNewest的各种缓冲选择只会在缓冲区被填满时丢弃值。在一些AsyncStreams中,这可能是一个可行的解决方案(例如,如果你跟踪用户位置,你可能只关心最近的位置),但是当上传文件块时,你显然不能让它在缓冲区耗尽时丢弃块。
因此,与AsyncStream相比,只需使用自定义AsyncSequence来 Package 文件阅读,直到实际需要时才会读取下一个块,从而显著减少峰值内存使用,例如:

struct FileMock: AsyncSequence {
    typealias Element = Chunk

    struct AsyncIterator : AsyncIteratorProtocol {
        let chunkSize = 1_000_000
        let maxChunks = 100
        var current = 0

        mutating func next() async -> Chunk? {
            os_signpost(.event, log: log, name: "chunk")

            guard current < maxChunks else { return nil }
            defer { current += 1 }
            return Chunk(index: current, data: Data(repeating: UInt8(current & 0xff), count: chunkSize))
        }
    }

    func makeAsyncIterator() -> AsyncIterator {
        return AsyncIterator()
    }
}

然后呢

func uploadAll() async throws {
    try await withThrowingTaskGroup(of: Void.self) { group in
        var index = 0
        for await chunk in FileMock() {
            index += 1
            if index > 5 {
                try await group.next()
            }
            group.addTask { [self] in
                try await upload(chunk)
            }
        }
        try await group.waitForAll()
    }
}

这避免了一次加载内存中的所有100 MB。请注意,内存的垂直比例是不同的,但您可以看到峰值使用比上图少了100 MB,并且路标(显示数据何时被读入内存)现在分布在整个图中,而不是在开始时全部分布:

现在,显然,我只是在模拟使用Chunk/Data对象阅读一个大文件,并模拟使用Task.sleep上传,但它希望说明了基本思想。
总之,不要使用AsyncStream来读取文件,而是考虑使用自定义AsyncSequence或其他模式来读取文件,因为需要块。
其他一些观察:

  • 你说“尝试将try wait group.next()为每第五个块”。也许你可以给我们看看你试过什么。但请注意,this answer并没有说“每个第5个块”,而是说“第5个块之后的每个块”。我们无法评论你尝试了什么,除非你告诉我们你实际上尝试了什么(或提供一个MCVE)。如上所示,使用Instruments的“兴趣点”工具可以显示实际的并发性。
  • 顺便说一下,当上传大型资产时,请考虑使用基于文件的上传而不是Data。基于文件的上传内存效率要高得多。无论资产的大小如何,在基于文件的资产期间使用的内存都将以kb为单位。您甚至可以完全关闭分块功能,无论文件大小如何,基于文件的上传都将使用很少的内存。URLSession文件上传具有最小的内存占用。这也是我们进行基于文件上传的原因之一。
  • 基于文件的上传的另一个原因是,特别是对于iOS,可以将基于文件的上传与后台会话结合起来。通过后台会话,用户甚至可以离开应用程序去做其他事情,上传将继续在后台操作。在这一点上,你可以重新评估你是否需要/想要做组块。
xfb7svmp

xfb7svmp2#

我希望能够将异步任务放入一个队列中,类似于NSOperationQueue。我想限制并发操作的最大数量,并且设置优先级,以便高优先级任务在低优先级任务之前从队列中取出。
WWDC实验室的一位苹果工程师指出,你可以使用withCheckedContinuation来挂起任务。这提供了一个延续,您可以在以后调用该延续以重新启动任务。
这是我的跑步者的钥匙。
使用以下命令创建流道

static let analysis = Runner(maxTasks: 2)

然后添加任务

try await Runner.analysis.queue(priority: Runner.Priority.high) {
    [weak self] in
    //Do work here
    try await doSomethingExpensive()
}

跑步者如下…

import Foundation

protocol HasPriority {
    var priority:Double {get}
}


actor Runner  {
    //MARK: Initialisers
    
    /// Create runner with max tasks
    /// - Parameter maxTasks: count
    init(maxTasks: Int) {
        self.maxTasks = maxTasks
    }
    
    //MARK: Static/Class constants
    
    //MARK: Structures (enums / errors / notifications / etc)
    
    /// Concrete implementation of HasPriority
    enum Priority:HasPriority {
        case high //100
        case medium //50
        case low //0
        case custom(Double)
        //Note - date variants are not compatible other cases, oldestFirst is not compatible with newestFirst
        case oldestFirst(Date)
        case newestFirst(Date)
        
        var priority: Double {
            switch self {
            case .high:
                return 100
            case .medium:
                return 50
            case .low:
                return 0
            case .custom(let value):
                return value
            case .oldestFirst(let date):
                return -date.timeIntervalSince1970
            case .newestFirst(let date):
                return date.timeIntervalSince1970
            }
        }
    }
    
    /// Tickets hold priority and continuation information.
    /// These are only modified or read by the actor after initial creation, so we don't have to worry about concurrency
    /// They're the operation holder
    private class Ticket:Identifiable {
        internal init(priority: Double,runner:Runner) {
            self.priority = priority
            self.runner = runner
        }

        let id = UUID()
        let priority:Double
        private var runner:Runner
        var continuation:CheckedContinuation<Void, Never>?
        var running:Bool = false
        
        func didFinish() async {
            await runner.didFinish(self)
        }
        
        func run() {
            running = true
            continuation!.resume()
        }
    }
    
    //MARK: Published vars
    
    //MARK: Vars

    private let maxTasks:Int
    
    //MARK: Coding Keys
    
    //MARK: Class Methods
    
    //MARK: Instance Methods
    
    
    /// Current running count
    private var runningCount:Int {
        return tickets.filter({ $0.running }).count
    }
    
    /// called to progress the ticket queue
    private func progress() {
        while(runningCount < maxTasks) {
            let notRunning = tickets.filter { !$0.running }
            let topPriority = notRunning.max { t1, t2 in
                t1.priority < t2.priority
            }
            
            guard let topPriority else {
                print("Queue emptied")
                return
            }
            
            //there may be multiple elements with max priority. If so, we want to run the first
            guard let next = notRunning.first(where: { $0.priority == topPriority.priority  }) else {
                fatalError("this should not be possible")
            }
            
            next.run()
        }
    }
    
    /// Must be called when a ticket finishes to remove it from the queue
    /// - Parameter ticket: ticket
    private func didFinish(_ ticket:Ticket) {
        //print("did finish ticket with priority: \(ticket.priority)")
        tickets.removeAll { $0.id == ticket.id }
        progress()
    }

    private var tickets:[Ticket] = []
    
    nonisolated
    /// Used in the continuation to add a ticket
    /// - Parameter ticket: ticket
    private func add(_ ticket:Ticket) {
        Task {
            await append(ticket)
        }
    }
    
    /// Actor isolated function to add ticket
    /// - Parameter ticket: ticket
    private func append(_ ticket:Ticket) {
        precondition(ticket.continuation != nil)
        tickets.append(ticket)
        progress()
    }
    
    nonisolated
    /// Queue an async task. The task is suspended (so no thread is required), then run according to priority in the queue
    /// If priorities are equal, then tasks are run in order of submission
    /// - Parameters:
    ///   - priority: Use Runner.Priority or create your own enum which conforms to HasPriority
    ///   - work: the async work to do
    /// - Returns: the task return value
    func queue<Success>(priority:HasPriority, work:@escaping  (() async throws -> Success) ) async throws -> Success {
        let ticket:Ticket = Ticket(priority: priority.priority, runner: self)
        
        defer {
            Task {
                await didFinish(ticket)
            }
        }
        
        await withCheckedContinuation({ continuation in
            ticket.continuation = continuation
            self.add(ticket)
            return ()
        })
        
        //If task has been cancelled while in the queue - we'll find out when we pull it off the queue and run it
        try Task.checkCancellation()
        
        return try await work()
        
    }
}

相关问题