我试图弄清楚这种方法是否是线程安全的,如果getStream()和update(value:...)将同时在不同线程上被调用?
final class SomeNotifier {
static let shared = SomeNotifier()
private let value = PassthroughSubject<String, Never>()
private var cancellables: Set<AnyCancellable> = []
private init() {}
func getStream() -> AsyncStream<String> {
return AsyncStream { [weak self] continuation in
guard let self = self else { return }
self.value.sink { completion in
switch completion {
case .finished:
continuation.finish()
case .failure:
continuation.finish()
}
} receiveValue: { value in
continuation.yield(value)
}
.store(in: &cancellables)
}
}
func update(value: String) {
self.value.send(value)
}
我想有一些仓库,可以通知不同的观察员有关内部状态的变化
2条答案
按热度按时间mzsu5hc01#
在Cy-4AH的answer的变体中(其使用
actor
用于同步;+1),我会确保添加一个onTermination
处理程序,以在异步序列被取消时删除相关的延续。例如:关于这个主题有很多变化,但是实现的细节并不像一般的观察那样重要:(a)
actor
的使用;以及(B)使用onTermination
处理程序来清除通知程序对象的寿命可能超过各个序列的情况。FWIW,如果我真的想为
String
通知创建一个单例:顺便说一句,我通常更喜欢使用
AsyncChannel
来处理这些类似“主体”的行为,但是单个通道不允许多个观察者,如果你试图拥有这些通道的集合,它(目前)不提供所需的类似onTermination
的处理程序。FWIW,如果你要使用合并
PassthroughSubject
,它可能看起来像:同样,它是一个
actor
来提供线程安全的交互,并使用onTermination
来清理单个序列。nwlqm0z12#
实际上你不需要
PassthroughSubject
在update方法中你可以调用continuation.yield(value)
你不需要[weak self]
,因为它没有转义闭包你可以使用actors来保证线程安全。例如: