kotlin 当一个协同程序通道完成一项工作时如何得到通知

vbkedwbf  于 2023-01-31  发布在  Kotlin
关注(0)|答案(1)|浏览(94)

我有一个使用CouchBase lite的Android应用程序,我尝试保存文档并使用coroutin通道获得确认,我使用通道的原因是确保每个操作都在同一范围内完成
这里是我的尝试基于这选择的答案在这里How to properly have a queue of pending operations using Kotlin Coroutines?

object DatabaseQueue {
    private val scope = CoroutineScope(IOCoroutineScope)
    private val queue = Channel<Job>(Channel.UNLIMITED)

    init {
        scope.launch(Dispatchers.Default) {
            for (job in queue) job.join()
        }
    }

    fun submit(
        context: CoroutineContext = EmptyCoroutineContext,
        block: suspend CoroutineScope.() -> Unit
    ) {
        val job = scope.launch(context, CoroutineStart.LAZY, block)
        queue.trySendBlocking(job)
    }

    fun submitAsync(
        context: CoroutineContext = EmptyCoroutineContext,
        id: String,
        database: Database
    ): Deferred<Document?> {
        val job = scope.async(context, CoroutineStart.LAZY) {
            database.getDocument(id)
        }
        queue.trySendBlocking(job)
        return job
    }

    fun cancel() {
        queue.cancel()
        scope.cancel()
    }
}
fun Database.saveDocument(document: MutableDocument) {
    DatabaseQueue.submit {
        Timber.tag("quechk").d("saving :: ${document.id}")
        this@saveDocument.save(document)
    }
}

fun Database.getDocumentQ(id: String): Document? {
    return runBlocking {
        DatabaseQueue.submitAsync(id = id, database = this@getDocumentQ).also {
            Timber.tag("quechk").d("getting :: $id")
        }.await()
    }
}

这里我的问题是,当我有许多数据库操作要写入和读取时,读取的执行速度比写入的执行速度快,这给了我空结果,因此,我需要知道的是

  • 这是最好的方法吗?还是有另一个最佳解决方案
  • 如何处理作业并从通道返回结果以避免空结果
u91tlkcl

u91tlkcl1#

通过修改原来的解决方案,你实际上使它不能正常工作。整个想法是为每个提交的代码块创建一个不活动的协程,然后开始一个接一个地执行这些协程。在你的例子中,你向调用者暴露了一个Deferred,所以调用者能够开始执行协程,结果,协程不再顺序地运行,而是并发地运行。
在保持几乎相同的代码的同时,解决这个问题的最简单的方法是引入另一个Deferred,它不直接与排队协程紧密相连:

fun submitAsync(
    context: CoroutineContext = EmptyCoroutineContext,
    id: String,
    database: Database
): Deferred<Document?> {
    val ret = CompletableDeferred<Document?>()
    val job = scope.launch(context, CoroutineStart.LAZY) {
        ret.completeWith(runCatching { database.getDocument(id) })
    }
    queue.trySendBlocking(job)
    return ret
}

然而,根据你的情况,这可能是一个矫枉过正。例如,如果你不需要保证严格的FIFO排序,一个简单的Mutex就足够了。另外,请注意,返回futures/deferred只是为了等待它们的经典方法是协程中的反模式。我们应该简单地使用一个suspend函数并直接调用它。

相关问题