android 我是否正确使用了flatMapConcat?

kokeuurv  于 2023-05-05  发布在  Android
关注(0)|答案(2)|浏览(138)

从RxJava迁移到Flow在RxJava中,我使用Observable的concat方法首先发出一个存储在room中的列表,然后发出一个来自网络请求的列表。在Flow中,它看起来像是flatMapConcat。

@Singleton
class StoreRepository @Inject constructor(
    private val remoteDataSource: StoreRemoteDataSource,
    @DatabaseModule.DatabaseSource private val dbSource: LocalDataSource,
    @NetworkModule.IoDispatcher private val dispatcher: CoroutineDispatcher
    ) {


    @OptIn(FlowPreview::class)
    suspend fun getStoreFeed(latitude: Double, longitude: Double): Flow<List<Store>> = flow {
        withContext(dispatcher) {
            dbSource.fetchStores().flatMapConcat { localStores ->
                remoteDataSource.getStoreFeed(latitude, longitude)
                    .onStart {
                        Log.d("TRACE", "emit local stores while remote")
                        emit(localStores)
                    }
                    .onEach { remoteStores ->
                        Log.d("TRACE", "insert into room db")
                        dbSource.insertStores(remoteStores)
                    }
                    .catch { e ->
                        if (localStores.isEmpty()) {
                            Log.d("TRACE", "nothing in db and network fails")
                            // If both local and remote data sources fail, propagate error
                            throw e
                        }
                    }
            }
        }
    }

在我的仓库函数中,我注入的调度程序是dagger 2提供的Dispatchers.IO。
在我的视图模型中,我尝试使用collect方法观察getStoreFeed的2个排放

class StoreFeedViewModel @Inject constructor(
    private val storeRepository: StoreRepository
): ViewModel() {

    private val _storesResult = MutableLiveData<ResultState<List<Store>>>()
    val storesResult: LiveData<ResultState<List<Store>>> = _storesResult

    fun initializeFeed() {
        viewModelScope.launch {
            _storesResult.postValue(ResultState.Loading)
            try {
                storeRepository.getStoreFeed(
                    Constants.DEFAULT_LATITUDE,
                    Constants.DEFAULT_LONGITUDE
                ).collect { storeList ->
                    Log.d("TRACE", "received ${storeList.first()}")
                    _storesResult.postValue(
                        ResultState.Success(storeList)
                    )
                }
            } catch (ex: Exception) {
                Log.d("TRACE", "exception fetching from reppsiotry ${ex.message}")
                _storesResult.postValue(
                    ResultState.Failure("failed to get stores")
                )
            }
        }
    }

添加了print语句,但没有打印输出,调试不会在任何这些行停止。

bweufnob

bweufnob1#

您的主要问题是,您将连接流 Package 在一个flow构建器中,该构建器从不发出任何东西,因此它返回一个空流,当收集时,创建一个冷流,该冷流立即释放到GC。
里面的代码在我看来是可行的,但它很脆弱。通过逻辑步骤:

  • flatMapConcat是在源缓存数据上调用的,所以每次修改数据库时,里面的一切都会重复。不是你想要的。在发出缓存数据之后,您不希望缓存数据冗余地覆盖来自网络的附加值。而且你也不想立即从网络重新启动你的fetch。
  • 在内部流中,您将缓存新获取的值。这将触发顶部源流发出另一项。这本质上是一个无限递归循环。
  • 如果dbSource.fetchStores()是一个无限流,那么循环问题就不会发生,因为连接的第一个流永远不会完成。这就是为什么我说我认为它实际上会起作用。
  • 如果dbSource.fetchStores()是一个有限流,我认为你有一个概念上的问题。如果你只取一次东西,它应该是一个直接返回List<Store>的挂起函数,或者它可以是(不太常见的)一个返回Deferred<List<Store>>的非挂起函数。流程是一系列项目。对于从网络中获取的东西,我想不出一个理由,你想这样做的特定次数以外的1或无穷大。

除此之外,这个函数不应该被标记为suspend,因为它所做的只是创建一个冷流。
由于我们不希望每次更新数据库时都重新启动,因此flatMapConcat甚至不是一个合适的解决方案。假设dbSource.fetchStores()是一个无限的Flow,我会这样做:

fun getStoreFeed(latitude: Double, longitude: Double): Flow<List<Store>> = flow {
    Log.d("TRACE", "emit local stores while remote")
    // we emit only the initial value of database flow, the current cached value:
    val localStores = dbSource.fetchStores().first()
    emit(localStores) 

    // Then we can emit the rest from the network, while also caching them.
    val remoteFlow = remoteDataSource.getStoreFeed(latitude, longitude)
        .onEach { remoteStores ->
            Log.d("TRACE", "insert into room db")
            dbSource.insertStores(remoteStores)
        }
        .catch { e ->
            if (localStores.isEmpty()) {
                Log.d("TRACE", "nothing in db and network fails")
                // If both local and remote data sources fail, propagate error
                throw e
            }
        }
    emitAll(remoteFlow)
}.flowOn(dispatcher)

我使用了flowOn,因为与withContext相比,它减少了代码的缩进。我认为您甚至需要指定一个分派器是值得怀疑的,因为这个流中的代码看起来没有阻塞或与UI一起工作。

jtoj6r0c

jtoj6r0c2#

看起来您正在使用flatMapConcat将本地和远程存储组合到一个流中。但是,flatMapConcat返回一个包含连接值的新流,并且您不会从getStoreFeed函数返回这个新流。
要解决这个问题,需要在dbSource.fetchStores().flatMapConcat之前添加一个return语句,以返回flatMapConcat创建的新流。
下面是更新后的getStoreFeed函数:

suspend fun getStoreFeed(latitude: Double, longitude: Double): Flow<List<Store>> = flow {
    withContext(dispatcher) {
        dbSource.fetchStores().flatMapConcat { localStores ->
            remoteDataSource.getStoreFeed(latitude, longitude)
                .onStart {
                    Log.d("TRACE", "emit local stores while remote")
                    emit(localStores)
                }
                .onEach { remoteStores ->
                    Log.d("TRACE", "insert into room db")
                    dbSource.insertStores(remoteStores)
                }
                .catch { e ->
                    if (localStores.isEmpty()) {
                        Log.d("TRACE", "nothing in db and network fails")
                        // If both local and remote data sources fail, propagate error
                        throw e
                    }
                }
        }
    }
}

相关问题