Swift 5.5并发性:如何序列化异步任务以替换maxConcurrentOperationCount = 1的操作队列?

pprl5pva  于 2022-12-22  发布在  Swift
关注(0)|答案(1)|浏览(176)

我目前正在迁移我的应用以使用Swift中的并发模型。我希望序列化任务以确保它们一个接一个地执行(没有视差)。在我的用例中,我希望侦听由通知中心发布的通知,并在每次发布新通知时执行任务。但我希望确保以前没有任务正在运行。它这相当于使用maxConcurrentOperationCount = 1的操作队列。
例如,我在应用中使用CloudKit和Core Data,并使用持久历史跟踪来确定商店中发生了哪些更改。在此Synchronizing a Local Store to the Cloud示例代码中,苹果使用操作队列来处理历史处理任务(在CoreDataStack中)。此OperationQueue的最大操作数设置为1。

private lazy var historyQueue: OperationQueue = {
    let queue = OperationQueue()
    queue.maxConcurrentOperationCount = 1
    return queue
}()

当收到Core Data通知时,会在该串行操作队列中添加一个新任务,因此如果收到很多通知,则会以串行方式依次执行。

@objc
func storeRemoteChange(_ notification: Notification) {
    // Process persistent history to merge changes from other coordinators.
    historyQueue.addOperation {
        self.processPersistentHistory()
    }
}

在此Loading and Displaying a Large Data Feed示例代码中,Apple使用任务处理历史记录更改(在QuakesProvider中)。

// Observe Core Data remote change notifications on the queue where the changes were made.
notificationToken = NotificationCenter.default.addObserver(forName: .NSPersistentStoreRemoteChange, object: nil, queue: nil) { note in
    Task {
        await self.fetchPersistentHistory()
    }
}

我觉得第二个项目中有问题,因为任务可以按任何顺序发生,而不一定是按顺序(与第一个项目相反,其中OperationQueue为maxConcurrentOperationCount = 1)。
我们是否应该在某处使用一个actor来确保方法被串行调用?
我考虑过这样的实现,但我还不是很适应:

actor PersistenceStoreListener {
    let historyTokenManager: PersistenceHistoryTokenManager = .init()
    private let persistentContainer: NSPersistentContainer

    init(persistentContainer: NSPersistentContainer) {
        self.persistentContainer = persistentContainer
    }

    func processRemoteStoreChange() async {
        print("\(#function) called on \(Date.now.formatted(date: .abbreviated, time: .standard)).")
    }
}

其中,当接收到新通知时将调用processRemoteStoreChange方法(AsyncSequence):

notificationListenerTask = Task {
   let notifications = NotificationCenter.default.notifications(named: .NSPersistentStoreRemoteChange, object: container.persistentStoreCoordinator)
   
   for await _ in notifications {
        print("notificationListenerTask called on \(Date.now.formatted(date: .abbreviated, time: .standard)).")
        await self.storeListener?.processRemoteStoreChange()
    }
}
db2dz4w8

db2dz4w81#

如果您想通过1maxConcurrentOperationCount(“串行”操作队列)获得OperationQueue的行为,可以通过actor实现。
串行OperationQueue有两种模式:
1.队列中的操作本身是同步的。
如果你使用标准的OperationQueue(也就是说,你还没有子类化OperationOperationisFinished做手动KVO,等等),一个简单的actor就可以达到我们想要的效果。
这里的关键是,这只适用于同步方法(即那些没有await挂起点的方法)。
1.队列中的操作是异步的。
操作队列的一个更高级的用例是处理异步任务之间的依赖关系。这在操作队列中是一个更复杂的场景,需要一个自定义的Operation子类,在其中手动处理isFinished的KVO,等等。(参见该模式的示例的答案)
使用Swift并发执行此操作的挑战在于参与者是可重入的(参见SE-0306中的可重入性讨论)。如果参与者的方法是异步的(即,具有async-await),则会引入挂起点,即,一个调用中的await将允许另一个async方法在该参与者上运行。
要实现不同async方法之间的串行执行,必须保留对前面的Task的引用并等待它。但是必须小心,其中调度方法(下面示例中的add)是同步的。
请考虑以下示例(它使用操作系统路标,以便我能够以图形方式说明Instruments中的行为):

import os.signpost

private let pointsOfInterest = OSLog(subsystem: "log", category: .pointsOfInterest)

class ViewController: UIViewController {

    let example = Example()
    let taskSerializer = SerialTasks<Void>()

    @IBAction func didTapSync(_ sender: Any) {
        os_signpost(.event, log: pointsOfInterest, name: #function)
        startSynchronous()
    }

    @IBAction func didTapAsync(_ sender: Any) {
        os_signpost(.event, log: pointsOfInterest, name: #function)
        Task { try await startAsynchronous() }
    }

    @IBAction func didTapSerializedAsync(_ sender: Any) {
        os_signpost(.event, log: pointsOfInterest, name: #function)
        Task { try await startSerializedAsynchronous() }
    }

    func startSynchronous() {
        Task {
            await example.synchronousExample("1. synchronous")
        }
    }

    func startAsynchronous() async throws {
        try await example.asynchronousExample("2. asynchronous")
    }

    func startSerializedAsynchronous() async throws {
        try await taskSerializer.add {
            try await self.example.asynchronousExample("3. serial async")
        }
    }
}

actor Example {
    func asynchronousExample(_ name: StaticString) async throws {
        let id = OSSignpostID(log: pointsOfInterest)
        os_signpost(.begin, log: pointsOfInterest, name: name, signpostID: id)
        defer { os_signpost(.end, log: pointsOfInterest, name: name, signpostID: id) }

        try await Task.sleep(for: .seconds(2))
    }

    func synchronousExample(_ name: StaticString) {
        let id = OSSignpostID(log: pointsOfInterest)
        os_signpost(.begin, log: pointsOfInterest, name: name, signpostID: id)
        defer { os_signpost(.end, log: pointsOfInterest, name: name, signpostID: id) }

        Thread.sleep(forTimeInterval: 2)
    }
}

actor SerialTasks<Success> {
    private var previousTask: Task<Success, Error>?

    func add(block: @Sendable @escaping () async throws -> Success) async throws -> Success {
        let task = Task { [previousTask] in
            let _ = await previousTask?.result
            return try await block()
        }
        previousTask = task
        return try await task.value
    }
}

对于同步任务(场景1),startSynchronous是最简单的,只需调用actor的synchronous方法,就可以串行执行。
对于异步任务(场景2)startAsynchronous,如果有await挂起点,则会由于执行元可重入而丢失顺序行为。
但是,您可以细化异步任务模式(场景3),方法是在上面的代码中使用一个actor SerialTasks,它跟踪前一个任务,在开始下一个任务之前等待它。一个微妙的点是add方法本身是同步的(尽管它所采用的闭包是异步的)。这避免了添加多个任务时的微妙竞争。
在Instruments中运行上述命令,我们可以图形化地看到执行情况,其中路标显示任务启动的位置,间隔显示任务执行的时间:

简而言之,如果您的actor只执行同步任务(这是您的情况),那么actor会自动产生maxConcurrentOperationCount = 1类型的行为;如果任务是异步的,您只需在开始下一个任务之前执行await之前的任务。

相关问题