在批处理过程中使用Kotlin协程时出现堆问题

wd2eg0qa  于 2023-01-26  发布在  Kotlin
关注(0)|答案(1)|浏览(141)

我想为列表中的每个元素调用一个API。
因此,我创建了下面的代码,这是一个扩展函数:

suspend fun <T, V> Iterable<T>.customAsyncAll(method: suspend (T) -> V): Iterable<V> {
    val deferredList = mutableListOf<Deferred<V>>()
    val scope = CoroutineScope(dispatchers.io)
    forEach {
      val deferred = scope.async {
        try {
          method(it)
        } catch (e: Exception) {
          log.error { "customAsyncAll Exception in $method method " + e.stackTraceToString())
          }
          throw e
        }
      }
      deferredList.add(deferred)
    }
    return deferredList.awaitAll()
  }

调用代码为:

val result = runBlocking{ list.customAsyncAll { apiCall(it) }.toList() }

我看到错误posting Resource Exhausted event: Java heap space。此代码有什么问题?
当一个API调用抛出异常时,剩余的courouting异步内容会被释放还是仍然占用堆空间?

olhwl3o2

olhwl3o21#

我猜你正在传递一个有点大的列表(50+项目)。我相信,使这么多的调用是问题所在,并且现实地说,我不认为你会有任何性能增益打开超过10个连接到API的时间。我的建议是限制并发调用的任何数量小于20。
有很多方法可以实现这一点,我推荐使用信号量。

suspend fun <T, V> Iterable<T>.customAsyncAll(method: suspend (T) -> V): Iterable<V> {
    val deferredList = mutableListOf<Deferred<V>>()
    val scope = CoroutineScope(Dispatchers.IO)
    val sema = Semaphore(10)
    forEach {
      val deferred = scope.async {
        sema.withPermit {
          try {
            method(it)
          } catch (e: Exception) {
            log.error { 
              "customAsyncAll Exception in $method method " 
                + e.stackTraceToString())
            }
            throw e
          }
        }
      }
      deferredList.add(deferred)
    }
    return deferredList.awaitAll()
  }
侧面标注

确保在完成创建后取消任何自定义CouroutineScope,请参见Custom usage

相关问题