Kotlin并行处理收集?

kgsdhlau  于 2023-10-23  发布在  Kotlin
关注(0)|答案(5)|浏览(130)

我有一个对象的集合,我需要对它们执行一些转换。目前我正在使用:

var myObjects: List<MyObject> = getMyObjects()

myObjects.forEach{ myObj ->
    someMethod(myObj)
}

它工作得很好,但我希望通过并行运行someMethod()来加速它,而不是等待每个对象完成,然后再开始下一个。
在Kotlin中有什么方法可以做到这一点吗?比如doAsyncTask之类的。
我知道这在asked over a year ago时是不可能的,但现在Kotlin有了像doAsyncTask这样的协程,我很好奇是否有协程可以帮助

4nkexdtk

4nkexdtk1#

是的,这可以使用协程来完成。下面的函数对集合的所有元素并行应用一个操作:

fun <A>Collection<A>.forEachParallel(f: suspend (A) -> Unit): Unit = runBlocking {
    map { async(CommonPool) { f(it) } }.forEach { it.await() }
}

虽然定义本身有点神秘,但您可以轻松地按照预期应用它:

myObjects.forEachParallel { myObj ->
    someMethod(myObj)
}

并行Map可以用类似的方式实现,参见https://stackoverflow.com/a/45794062/1104870

jgovgodb

jgovgodb2#

Java Stream在Kotlin中使用简单:

tasks.stream().parallel().forEach { computeNotSuspend(it) }

但是,如果您使用的是Android,如果您想要与低于24的API兼容的应用程序,则无法使用Java 8。
你也可以按照你的建议使用协程。但到目前为止(2017年8月),它还不是语言的一部分,你需要安装一个外部库。有很好的指南与例子。

runBlocking<Unit> {
        val deferreds = tasks.map { async(CommonPool) { compute(it) } }
        deferreds.forEach { it.await() }
    }

请注意,协程是用非阻塞多线程实现的,这意味着它们可以比传统的多线程更快。我在下面的代码中对Stream并行与协程进行了基准测试,在这种情况下,协程方法在我的机器上快了7倍。* 但是你必须自己做一些工作来确保你的代码是“挂起”(非锁定)的,这可能是相当棘手的。* 在我的例子中,我只是调用delay,这是库提供的suspend函数。非阻塞多线程并不总是比传统多线程快。如果有许多线程什么都不做,只是等待IO,那么速度会更快,我的基准测试就是这样做的。
我的基准测试代码:

import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.async
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.launch
import kotlinx.coroutines.experimental.runBlocking
import java.util.*
import kotlin.system.measureNanoTime
import kotlin.system.measureTimeMillis

class SomeTask() {
    val durationMS = random.nextInt(1000).toLong()

    companion object {
        val random = Random()
    }
}

suspend fun compute(task: SomeTask): Unit {
    delay(task.durationMS)
    //println("done ${task.durationMS}")
    return
}

fun computeNotSuspend(task: SomeTask): Unit {
    Thread.sleep(task.durationMS)
    //println("done ${task.durationMS}")
    return
}

fun main(args: Array<String>) {
    val n = 100
    val tasks = List(n) { SomeTask() }

    val timeCoroutine = measureNanoTime {
        runBlocking<Unit> {
            val deferreds = tasks.map { async(CommonPool) { compute(it) } }
            deferreds.forEach { it.await() }
        }
    }

    println("Coroutine ${timeCoroutine / 1_000_000} ms")

    val timePar = measureNanoTime {
        tasks.stream().parallel().forEach { computeNotSuspend(it) }
    }
    println("Stream parallel ${timePar / 1_000_000} ms")
}

在我的4核计算机上的输出:

Coroutine: 1037 ms
Stream parallel: 7150 ms

如果取消注解掉两个compute函数中的println,您将看到在非阻塞协程代码中,任务是按正确的顺序处理的,但不是用Streams。

yrefmtwq

yrefmtwq3#

要并行处理集合中的项,可以使用Kotlin Coroutines。例如,以下扩展函数并行处理项目并等待它们被处理:

suspend fun <T, R> Iterable<T>.processInParallel(
    dispatcher: CoroutineDispatcher = Dispatchers.IO,
    processBlock: suspend (v: T) -> R,
    ): List<R> = coroutineScope { // or supervisorScope
          map {
              async(dispatcher) { processBlock(it) }
          }.awaitAll()
    }

这是Iterable<T>类型的suspend扩展函数,它对项目进行并行处理,并返回处理每个项目的结果。默认情况下,它使用Dispatchers.IO调度器将阻塞任务卸载到共享线程池。必须从协程(包括带有Dispatchers.Main分派器的协程)或另一个suspend函数调用。
从协程调用的示例:

val myObjects: List<MyObject> = getMyObjects()

someCoroutineScope.launch {
    val results = myObjects.processInParallel {
        someMethod(it)
    }
    // use processing results
}

其中someCoroutineScopeCoroutineScope的instance。
或者,如果你只想启动并忘记,你可以使用这个功能:

fun <T> CoroutineScope.processInParallelAndForget(
    iterable: Iterable<T>,
    dispatcher: CoroutineDispatcher = Dispatchers.IO,
    processBlock: suspend (v: T) -> Unit
) = iterable.forEach {
    launch(dispatcher) { processBlock(it) }
}

这是CoroutineScope上的扩展函数,它不返回任何结果。默认情况下,它还使用Dispatchers.IO调度器。可以使用CoroutineScope或从其他协程调用。调用示例:

someoroutineScope.processInParallelAndForget(myObjects) {
    someMethod(it)
}

// OR from another coroutine:

someCoroutineScope.launch {
    processInParallelAndForget(myObjects) {
        someMethod(it)
    }
}

其中someCoroutineScopeCoroutineScope的instance。

ttcibm8c

ttcibm8c4#

根据Kotlin语言的官方指南,OP的代码应该是这样的:

val myObjects: List<MyObject> = getMyObjects()

runBlocking(Dispatchers.Default) { // or Dispatchers.IO as you like
    myObjects.map {
        async { someMethod(it) }
    }.awaitAll()
}
pod7payv

pod7payv5#

你可以使用RxJava来解决这个问题。

List<MyObjects> items = getList()

Observable.from(items).flatMap(object : Func1<MyObjects, Observable<String>>() {
    fun call(item: MyObjects): Observable<String> {
        return someMethod(item)
    }
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(object : Subscriber<String>() {
    fun onCompleted() {

    }

    fun onError(e: Throwable) {

    }

    fun onNext(s: String) {
        // do on output of each string
    }
})

通过订阅Schedulers.io(),在后台线程上调度一些方法。

相关问题