java Coroutine支持AWS呼叫似乎没有按预期工作

tv6aics1  于 2023-06-20  发布在  Java
关注(0)|答案(1)|浏览(91)

我正在构建一个需要调用AWS SageMaker端点以获取预测结果的服务。由于有很多请求,我正在探索Coroutine来并行化调用,因为我正在使用Kotlin。然而,我是协程的新手,结果似乎与Coroutines official guide有点不一致。
下面的代码显示了逻辑。我有一个特性/输入列表,并将它们划分为几个分区,每个分区发送到SageMaker,由使用async的Coroutine支持。据我所知,协程是轻量级的,在挂起函数调用后,同一协程的线程可能会改变。但是,下面的实现日志显示的Coroutine线程总是相同的,除非我在suspend函数中取消注解delay(100)
知道为什么吗我担心的是对AWS SageMaker的调用总是阻塞线程和协程。

fun forecastBatch(
        features: Set<String>
    ): Map<String, float> {
        val partitions: List<List<String>> = features.chunked(MAX_BATCH_SIZE)
        val result: MutableMap<String, float> = mutableMapOf()
        runBlocking {
            val deferreds = partitions.mapIndexed { idx, partition ->
                CoroutineName("Coroutine-${idx}")
                    log.info("${Thread.currentThread().name} Coroutines:  ${currentCoroutineContext()[CoroutineName]} for Sending Request ${idx}X")
                    val batchRequest = partition.joinToString("\n")

                    val forecastResult = invokeEndpoint(batchRequest)
                    
                    log.info("${Thread.currentThread().name} Coroutines:  ${currentCoroutineContext()[CoroutineName]}  Receiving Request ${idx}X")

                    // ...
                    // Build Response
                }
            }
            deferreds.awaitAll().forEach{
                result += it
            }
        }
        return result
    }

    suspend fun invokeEndpoint(input: String): String {
        // Build Request
        // ...        

        val result: InvokeEndpointResponse = sageMakerRuntimeClient.invokeEndpoint(request)
        
        // If the next line is uncommented, the thread will change for the same coroutine
        // delay(100)
        return result
    }
a1o7rhls

a1o7rhls1#

因为sageMakerRuntimeClient.invokeEndpoint()不是一个挂起函数,所以invokeEndpoint()函数中没有挂起点,所以它的行为就像普通的阻塞函数一样。您可以使用suspendCoroutine将传统的阻塞函数或回调风格的异步函数转换为Kotlinstlye suspend函数。

suspend fun invokeEndpoint(input:String) = suspendCoroutine { continuation ->
    // Build Request
    // ...  
    val result: InvokeEndpointResponse = sageMakerRuntimeClient.invokeEndpoint(request)
    continuation.resume(result)
}

相关问题