从RxJava迁移到Flow在RxJava中,我使用Observable的concat方法首先发出一个存储在room中的列表,然后发出一个来自网络请求的列表。在Flow中,它看起来像是flatMapConcat。
@Singleton
class StoreRepository @Inject constructor(
private val remoteDataSource: StoreRemoteDataSource,
@DatabaseModule.DatabaseSource private val dbSource: LocalDataSource,
@NetworkModule.IoDispatcher private val dispatcher: CoroutineDispatcher
) {
@OptIn(FlowPreview::class)
suspend fun getStoreFeed(latitude: Double, longitude: Double): Flow<List<Store>> = flow {
withContext(dispatcher) {
dbSource.fetchStores().flatMapConcat { localStores ->
remoteDataSource.getStoreFeed(latitude, longitude)
.onStart {
Log.d("TRACE", "emit local stores while remote")
emit(localStores)
}
.onEach { remoteStores ->
Log.d("TRACE", "insert into room db")
dbSource.insertStores(remoteStores)
}
.catch { e ->
if (localStores.isEmpty()) {
Log.d("TRACE", "nothing in db and network fails")
// If both local and remote data sources fail, propagate error
throw e
}
}
}
}
}
在我的仓库函数中,我注入的调度程序是dagger 2提供的Dispatchers.IO。
在我的视图模型中,我尝试使用collect方法观察getStoreFeed的2个排放
class StoreFeedViewModel @Inject constructor(
private val storeRepository: StoreRepository
): ViewModel() {
private val _storesResult = MutableLiveData<ResultState<List<Store>>>()
val storesResult: LiveData<ResultState<List<Store>>> = _storesResult
fun initializeFeed() {
viewModelScope.launch {
_storesResult.postValue(ResultState.Loading)
try {
storeRepository.getStoreFeed(
Constants.DEFAULT_LATITUDE,
Constants.DEFAULT_LONGITUDE
).collect { storeList ->
Log.d("TRACE", "received ${storeList.first()}")
_storesResult.postValue(
ResultState.Success(storeList)
)
}
} catch (ex: Exception) {
Log.d("TRACE", "exception fetching from reppsiotry ${ex.message}")
_storesResult.postValue(
ResultState.Failure("failed to get stores")
)
}
}
}
添加了print语句,但没有打印输出,调试不会在任何这些行停止。
2条答案
按热度按时间bweufnob1#
您的主要问题是,您将连接流 Package 在一个
flow
构建器中,该构建器从不发出任何东西,因此它返回一个空流,当收集时,创建一个冷流,该冷流立即释放到GC。里面的代码在我看来是可行的,但它很脆弱。通过逻辑步骤:
flatMapConcat
是在源缓存数据上调用的,所以每次修改数据库时,里面的一切都会重复。不是你想要的。在发出缓存数据之后,您不希望缓存数据冗余地覆盖来自网络的附加值。而且你也不想立即从网络重新启动你的fetch。dbSource.fetchStores()
是一个无限流,那么循环问题就不会发生,因为连接的第一个流永远不会完成。这就是为什么我说我认为它实际上会起作用。dbSource.fetchStores()
是一个有限流,我认为你有一个概念上的问题。如果你只取一次东西,它应该是一个直接返回List<Store>
的挂起函数,或者它可以是(不太常见的)一个返回Deferred<List<Store>>
的非挂起函数。流程是一系列项目。对于从网络中获取的东西,我想不出一个理由,你想这样做的特定次数以外的1或无穷大。除此之外,这个函数不应该被标记为
suspend
,因为它所做的只是创建一个冷流。由于我们不希望每次更新数据库时都重新启动,因此
flatMapConcat
甚至不是一个合适的解决方案。假设dbSource.fetchStores()
是一个无限的Flow,我会这样做:我使用了
flowOn
,因为与withContext
相比,它减少了代码的缩进。我认为您甚至需要指定一个分派器是值得怀疑的,因为这个流中的代码看起来没有阻塞或与UI一起工作。jtoj6r0c2#
看起来您正在使用
flatMapConcat
将本地和远程存储组合到一个流中。但是,flatMapConcat
返回一个包含连接值的新流,并且您不会从getStoreFeed
函数返回这个新流。要解决这个问题,需要在
dbSource.fetchStores().flatMapConcat
之前添加一个return语句,以返回flatMapConcat
创建的新流。下面是更新后的
getStoreFeed
函数: