AndroidKotlin流运算符-等待所有流发出

olhwl3o2  于 2022-12-19  发布在  Kotlin
关注(0)|答案(1)|浏览(112)

我目前正在学习Android上的Kotlin流操作符,并希望发出一个网络请求或DB操作,然后发出并行请求,并等待所有流返回。

data class Category(var id: Int, var name: String, var parentCategoryId: Int?) {
        var subCategories: List<Category> = listOf()
    }

    data class Catalogue(var categories: List<Category>) {}

    // request to fetch top level categories
    fun getTopCats(): Flow<List<Category>> {
        return flowOf(
            listOf(
                Category(0, "Dairy", null),
                Category(1, "Fruits", null),
                Category(2, "Vegetables", null)
            )
        )
    }

    // request to fetch sub categories
    suspend fun getSubCats(catId: Int): Flow<List<Category>> {
        return when (catId) {
            0 -> flowOf(listOf(Category(3, "Milk", 0), Category(4, "Butter", 0)))
                .onEach { delay(1000) }
            1 -> flowOf(
                listOf(
                    Category(5, "Banana", 1),
                    Category(6, "Mandarin", 1),
                    Category(7, "Orange", 1)
                ).onEach { delay(2000) }
            )
            2 -> flowOf(
                listOf(
                    Category(8, "Carrot", 2),
                    Category(9, "Asparagus", 2),
                    Category(10, "Lettuce", 2)
                ).onEach { delay(3000) }
            )
            else -> flowOf()
        }
    }

第一次尝试-我想我做错了什么,因为子类别没有被提取。我应该把合并操作符放在哪里?

viewModelScope.launch {
        val catalogue = Catalogue(listOf())
        getTopCats().map {
            catalogue.categories = categories // assign top level category

            val flows = arrayListOf<Flow<List<Category>>>()
            categories.onEach { cat ->
                flows.add(getSubCats(cat.id))
            }
            combine(flows) { array ->
                array.onEach { list ->
                    catalogue.categories[list.first().id].subCategories = list
                }
            }
            catalogue
        }.flowOn(Dispatchers.Default).collect() {
            Timber.d("Received catalogue object")
        }
    }
jxct1oxe

jxct1oxe1#

您可以使用combine函数组合多个流并收集所有流的最新结果。
返回一个Flow,该Flow的值是使用转换函数通过组合每个Flow最近发出的值生成的。
例如:

// Some individual flows to return a single value
val getA = flowOf(1)
val getB = flowOf(2)
val getC = flowOf(3)

// Combine into a single flow that emits a list of the individual
// flow latest results
val combined = combine(getA, getB, getC) { a, b, c ->
    // Combine the results into whatever data structure
    // you want - here I made a list
    listOf(a,b,c)
}

MainScope().launch {
    combined.collect { results ->
        println("Got $results") // prints [1, 2, 3]
    }
}

Combine还可以获取任意长度的流列表,如果您有很多流的话,并且它返回值的数组(要求所有流返回相同的类型)

val manyFlows = listOf(getA, getB, getC)
val combined = combine(manyFlows) { result ->
    // result is an Array<T> where manyFlows is List<Flow<T>>
    result.toList()
}
    • 编辑**

作为一个更完整的示例,下面是如何获取顶级类别列表,然后将这些类别组合到一个流列表中,使用combine一次性调用所有流

suspend fun getData() {
    val top = getTopCats()
    top.collect { result ->
        // Get the result of the first flow
        val subcatFlows = result.map { getCatCount(it) }

        // Create a new flow to retrieve some count integer
        // from each of the top categories
        val allSubCats = combine(subcatFlows) { counts ->
            // produce a map of categories to counts
            result.zip(counts.toList()).toMap()
        }

        // Call the new combined flow to collect the
        // counts all at once
        allSubCats.collect { results ->
            println("Got $results") // prints {A=1, B=2, C=3}
        }
    }
}

// request to fetch the count for a given
// category by name
private fun getCatCount(name: String): Flow<Int> {
    return when(name) {
        "A" -> flowOf(1)
        "B" -> flowOf(2)
        "C" -> flowOf(3)
        else -> flowOf(-1)
    }
}

// request to fetch top level categories
private fun getTopCats(): Flow<List<String>> {
    return flowOf(listOf("A","B","C"))
}

那么你可以在协程中调用getData()

相关问题