我正在构建一个需要调用AWS SageMaker端点以获取预测结果的服务。由于有很多请求,我正在探索Coroutine来并行化调用,因为我正在使用Kotlin。然而,我是协程的新手,结果似乎与Coroutines official guide有点不一致。
下面的代码显示了逻辑。我有一个特性/输入列表,并将它们划分为几个分区,每个分区发送到SageMaker,由使用async
的Coroutine支持。据我所知,协程是轻量级的,在挂起函数调用后,同一协程的线程可能会改变。但是,下面的实现日志显示的Coroutine线程总是相同的,除非我在suspend函数中取消注解delay(100)
。
知道为什么吗我担心的是对AWS SageMaker的调用总是阻塞线程和协程。
fun forecastBatch(
features: Set<String>
): Map<String, float> {
val partitions: List<List<String>> = features.chunked(MAX_BATCH_SIZE)
val result: MutableMap<String, float> = mutableMapOf()
runBlocking {
val deferreds = partitions.mapIndexed { idx, partition ->
CoroutineName("Coroutine-${idx}")
log.info("${Thread.currentThread().name} Coroutines: ${currentCoroutineContext()[CoroutineName]} for Sending Request ${idx}X")
val batchRequest = partition.joinToString("\n")
val forecastResult = invokeEndpoint(batchRequest)
log.info("${Thread.currentThread().name} Coroutines: ${currentCoroutineContext()[CoroutineName]} Receiving Request ${idx}X")
// ...
// Build Response
}
}
deferreds.awaitAll().forEach{
result += it
}
}
return result
}
suspend fun invokeEndpoint(input: String): String {
// Build Request
// ...
val result: InvokeEndpointResponse = sageMakerRuntimeClient.invokeEndpoint(request)
// If the next line is uncommented, the thread will change for the same coroutine
// delay(100)
return result
}
1条答案
按热度按时间a1o7rhls1#
因为
sageMakerRuntimeClient.invokeEndpoint()
不是一个挂起函数,所以invokeEndpoint()
函数中没有挂起点,所以它的行为就像普通的阻塞函数一样。您可以使用suspendCoroutine将传统的阻塞函数或回调风格的异步函数转换为Kotlinstlye suspend函数。