Kotlin协程中是否有RxJava Subject的类似物?

yrdbyhpb  于 2023-05-23  发布在  Kotlin
关注(0)|答案(2)|浏览(198)

在2020年,许多Android开发人员都在谈论KotlinCoroutines。我试着去理解它以及协程如何在我的项目中帮助我。
所以我的问题是在RxJava Subjects的协程中是否有类似物?PublishSubject的最小值)。
我想要的-我使用PublishSubject将事件从ViewModel发送到我的View。我在onStart()方法上订阅eventsSubject并在onStop()方法上部署。
因此,KotlinCoroutines类似物的最低要求是:

  • 简单的测试(我使用TestSubscriber,它很棒)
  • 我想发送不带缓冲的事件
  • 易于订阅/取消订阅

下面是我的用例示例:
视图模型:

abstract class AbsStateViewModel<State, Event> : AbsViewModel() {
    private val stateSubject = BehaviorSubject.create<State>()
    private val eventSubject = PublishSubject.create<Event>()

    protected val requireState: State
        get() = stateSubject.value!!

    fun getStateObservable(): Observable<State> = stateSubject

    fun getEventObservable(): Observable<Event> = eventSubject

    protected fun sendEvent(event: Event) {
        eventSubject.onNext(event)
    }

    protected fun setState(state: State) {
        stateSubject.onNext(state)
    }
}

和用法:

viewModel.getEventObservable() // called on onAttach()
            .subscribe(
                    this::handleEvent,
                    this::defaultHandleException
            )
            .disposeOnDetach() // my extensions
uqcuzwp8

uqcuzwp81#

是的,在协同程序中有类似的RX主题,通道。如果你想重现PublishSubject的行为,你可以使用BroadcastChannel,如果你想重现BehaviorSubject的行为,你可以使用ConflatedBroadcastChannel

hgncfbus

hgncfbus2#

自Coroutines 1.4.0(2020年11月)以来,SharedFlowStateFlow是RxJava Subjects的新等价物。
PublishSubject可以替换为MutableSharedFlow()
ReplaySubject可以替换为MutableSharedFlow(Int.MAX_VALUE)
BehaviorSubject可以替换为MutableStateFlow(),但有一个特殊的警告!
MutableStateFlow将忽略相等项。它与BehaviorSubject().distinctUntilChanged()相同如果你想发射相等的值,你需要一个特殊的MutableSharedFlow

fun <T> newBehaviorFlow(init: T? = null) = MutableSharedFlow<T>(
  replay = 1,
  onBufferOverflow = BufferOverflow.DROP_OLDEST,
).also { init?.run { it.tryEmit(init) } }

对于您的用例,

abstract class AbsStateViewModel<State, Event> : AbsViewModel() {
    private val stateSubject = newBehaviorFlow<State>()
    private val eventSubject = MutableSharedFlow<Event>()

    protected val requireState: State
        get() = stateSubject.first()

    fun getStateObservable() = stateSubject.asStateFlow()

    fun getEventObservable() = eventSubject.asSharedFlow()

    protected fun sendEvent(event: Event) {
        eventSubject.tryEmit(event)
    }

    protected fun setState(state: State) {
        stateSubject.tryEmit(state)
    }
}

消费者:

viewModel.getEventObservable() // called on onAttach()
            .onEach(this::handleEvent)
            .catch(this::defaultHandleException)
            .launchIn(viewLifecycleOwner.lifecycleScope or viewModelScope) // from lifecycle-viewmodel-ktx:2.6.1

相关问题