swift 线程安全合并发布者到AsyncStream

vof42yt1  于 2023-03-28  发布在  Swift
关注(0)|答案(2)|浏览(150)

我试图弄清楚这种方法是否是线程安全的,如果getStream()和update(value:...)将同时在不同线程上被调用?

final class SomeNotifier {

static let shared = SomeNotifier()

private let value = PassthroughSubject<String, Never>()
private var cancellables: Set<AnyCancellable> = []

private init() {}

func getStream() -> AsyncStream<String> {
    return AsyncStream { [weak self] continuation in
        guard let self = self else { return }

        self.value.sink { completion in
            switch completion {
            case .finished:
                continuation.finish()
            case .failure:
                continuation.finish()
            }
        } receiveValue: { value in
            continuation.yield(value)
        }
        .store(in: &cancellables)
    }
}

func update(value: String) {
    self.value.send(value)
}

我想有一些仓库,可以通知不同的观察员有关内部状态的变化

mzsu5hc0

mzsu5hc01#

在Cy-4AH的answer的变体中(其使用actor用于同步;+1),我会确保添加一个onTermination处理程序,以在异步序列被取消时删除相关的延续。例如:

actor Notifier<Output> {
    private var continuations: [UUID: AsyncStream<Output>.Continuation] = [:]

    func values() -> AsyncStream<Output> {
        AsyncStream { continuation in
            let id = UUID()
            continuations[id] = continuation

            continuation.onTermination = { _ in
                Task { await self.cancel(id) }
            }
        }
    }

    func send(_ value: Output) {
        for continuation in continuations.values {
            continuation.yield(value)
        }
    }
}

private extension Notifier {
    func cancel(_ id: UUID) {
        continuations[id] = nil
    }
}

关于这个主题有很多变化,但是实现的细节并不像一般的观察那样重要:(a)actor的使用;以及(B)使用onTermination处理程序来清除通知程序对象的寿命可能超过各个序列的情况。
FWIW,如果我真的想为String通知创建一个单例:

final class StringNotifier: Sendable {
    static let shared = StringNotifier()

    private init() { }

    private let notifier = Notifier<String>()

    func values() async -> AsyncStream<String> {
        await notifier.values()
    }

    func send(_ value: String) {
        Task { await notifier.send(value) }
    }
}

顺便说一句,我通常更喜欢使用AsyncChannel来处理这些类似“主体”的行为,但是单个通道不允许多个观察者,如果你试图拥有这些通道的集合,它(目前)不提供所需的类似onTermination的处理程序。
FWIW,如果你要使用合并PassthroughSubject,它可能看起来像:

actor CombineNotifier<Output> {
    private let subject = PassthroughSubject<Output, Never>()
    private var cancellables: [UUID: AnyCancellable] = [:]

    func values() -> AsyncStream<Output> {
        AsyncStream { continuation in
            let id = UUID()

            cancellables[id] = subject.sink { _ in
                continuation.finish()
            } receiveValue: { value in
                continuation.yield(value)
            }

            continuation.onTermination = { _ in
                Task { await self.cancel(id) }
            }
        }
    }

    func send(_ value: Output) {
        subject.send(value)
    }
}

private extension CombineNotifier {
    func cancel(_ id: UUID) {
        cancellables[id] = nil
    }
}

同样,它是一个actor来提供线程安全的交互,并使用onTermination来清理单个序列。

nwlqm0z1

nwlqm0z12#

实际上你不需要PassthroughSubject在update方法中你可以调用continuation.yield(value)你不需要[weak self],因为它没有转义闭包你可以使用actors来保证线程安全。例如:

final class SomeNotifier<Element>: AsyncSequence {
    private actor Helper {
        private var continuations = [AsyncStream<Element>.Continuation]()
        func append(continuation: AsyncStream<Element>.Continuation) {
            continuations.append(continuation)
        }
        func update(value: Element) {
            var removeIndexes = IndexSet()
            for (index, continuation) in continuations.enumerated() {
                if case .terminated = continuation.yield(value) {
                    removeIndexes.insert(index)
                }
            }
            continuations.remove(atOffsets: removeIndexes)
        }
        deinit {
            for continuation in continuations {
                continuation.finish()
            }
        }
    }

    private let helper = Helper()

    func makeAsyncIterator() -> AsyncStream<Element>.Iterator {
        let stream = AsyncStream<Element> { continuation in
            Task { [helper] in
                await helper.append(continuation:continuation)
            }
        }
        return stream.makeAsyncIterator()
    }

    func update(value: Element) {
        Task { [helper] in
            await helper.update(value: value)
        }
    }
}

相关问题