Kotlin流离线缓存

8fq7wneg  于 2023-03-13  发布在  Kotlin
关注(0)|答案(2)|浏览(179)

我是Kotlin流的新手,我正在处理这个文档。Kotlin Flows。在这段代码中,数据源每五秒钟从API获取一次数据,然后发送出去。

这是我的示例数据源类。

我正在获取数据并发送出去。

class RemoteDataSourceImpl @Inject constructor(
private val api:CryptoApi
): RemoteDataSource {

override suspend fun cryptoList(): Flow<List<CryptoCoinDto>> {
    return flow {
        while (true){
            
            val data = api.getCoinList()
            emit(data)
            delay(5000L)
        }
    }
   }
}

这是我的示例存储库。

我正在Map数据并将其保存在房间数据库中。我想从房间数据库中获取数据并将其发送出去,这是因为单一真实源原则,但我仍然必须返回dataSource,因为如果我打开新流{},我无法访问dataSource的数据。当然,我可以通过使用RemoteDataSource类的List而不是Flowinside来解决这个问题。但我想理解这个示例。我怎么能在这里应用单一的真理来源。

class CoinRepositoryImpl @Inject constructor(
private val dataSource:RemoteDataSource,
private val dao: CryptoDao
):CoinRepository {

override fun getDataList(): Flow<List<CryptoCoin>> {

     dataSource.cryptoList().map { dtoList ->
        val entityList = dtoList.map { dto ->
            dto.toCryptoEntity()
        }
        dao.insertAll(entityList)
    }
    return dataSource.cryptoList().map {
        it.map { it.toCryptoCoin() }
    }

}
llew8vvj

llew8vvj1#

我个人不会为API返回Flow,因为从技术上讲,您一直只返回1个列表

override fun getDataList(): Flow<List<CryptoCoin>> {
    val localFlow = dao.getCryptoCoins()
    val apiFlow = flow<Unit> {
        // Since getDataList can't be suspend due to it returning a flow, we create a new flow simply to run a suspendable function
        getAndSaveCryptoCoinsFromApi()
    }.onStart { // If I don't add this onStart, then it won't work
        emit(Unit)
    }
    
    return localFlow.combine(apiFlow) { dbItems, _ ->
        dbItems
        // We just return DB items. Why? Since we are saving api items to the db, and since the db items return a flow, it will automatically emit values to the flow once new items are added to the db
    }
}

private suspend fun getAndSaveCryptoCoinsFromApi() {
    try {
        val result = api.getCryptoCoins().toDomain()
        val entities = result.map { it.toEntity() }
        dao.insertCoins(entities)
    } catch(e: Exception) { /**/ }

}
hs1ihplo

hs1ihplo2#

这实际上比看起来要复杂得多。流被设计成支持背压,这意味着它们通常只在需要时才生产物品。它们是被动的,而不是推动物品,而是从流中拉取物品。
(免责声明:这对于冷流是成立的,对于热流则不成立,但是cryptoList()是一个冷流。)
它的设计是为了大大简化消费者比生产者慢或者根本没有人在消费物品的情况,然后生产者停止生产,一切都很好。
在您的示例中,有两个使用者,因此这也更加复杂。您需要决定如果一个使用者比另一个慢,应该发生什么。例如,如果没有人从getDataList()收集数据,应该发生什么?有多个选项,每个选项需要稍微不同的方法:
1.停止使用源流并因此停止更新数据库。
1.随时更新数据库,如果没有人从getDataList()收集,则对项目进行排队。如果队列中的项目越来越多,该怎么办?
1.随时更新数据库,如果没有人从getDataList()收集,则丢弃项目。

附件一

这可以通过使用onEach()来完成:

return dataSource.cryptoList().onEach {
    // update db
}.map {
    it.map { it.toCryptoCoin() }
}

在此解决方案中,更新数据库是消耗getDataList()流的“副作用”。

附件2和附件3

在这种情况下,我们不能被动地等到有人向我们索要物品,我们需要主动地从源头流消费物品,并将其推送到下游流,所以我们需要一个热流:SharedFlow。此外,因为我们在本例中仍然是活动端,所以我们必须启动一个协程,它将在后台执行此操作。因此,我们需要一个CoroutineScope
解决方案取决于您的具体需求:你是否需要一个队列,如果队列超过了大小限制应该怎么办,等等,但它将类似于:

return dataSource.cryptoList().onEach {
    // update db
}.map {
    it.map { it.toCryptoCoin() }
}.shareIn(scope, SharingStarted.Eagerly)

您还可以阅读有关buffer()MutableSharedFlow的信息-它们可能对您有用。

相关问题