android 如何使用Kotlin协程正确地拥有一个挂起操作队列?

scyqe7ek  于 2023-06-28  发布在  Android
关注(0)|答案(2)|浏览(194)

后台

我在我工作的一个大型应用程序上发现了一些类/函数,它们的调用不应该在UI线程上(比如访问存储或DB)。
这样的操作可能会导致ANR,实际上我可以在Play控制台上看到ANR的百分比。
我想改变这一点,并希望通过使用Kotlin协程也有更多的代码顺序。
因此,目前我正在处理一个扩展BroadcastReceiver的类,因此它需要在UI线程上一个接一个地处理onReceive回调,每个回调都必须“等待”前一个回调完成。
在onReceive回调中,有些调用应该在后台线程上完成,有些应该在UI线程上完成。有时候有些情况下两者都有。
例如:

if( someCheckOnUiThread() && someDbOperation()) {
  ...
}

问题

我是Kotlin协程的新手,尽管我已经找到了如何处理这个问题的方法,但我很确定有一种更官方的方法来处理这个问题,因为我已经从其他人那里读到了一些提示和评论(here)。

我尝试过的

我所做的实际上是有效的,但它看起来更像是一个解决方案:

private val mainScope = MainScope()
private val backgroundWorkDispatcher: CoroutineDispatcher =
        java.util.concurrent.Executors.newFixedThreadPool(1).asCoroutineDispatcher()

然后在onReceive回调中使用它们:

@UiThread
override fun onReceive(somcContext: Context, intent: Intent) {
    val context = somcContext.applicationContext
    //use goAsync just because I'm in BroadcastReceiver
    val pendingAsyncResult = goAsync() 
    mainScope.launch {
        runInterruptible(backgroundWorkDispatcher) {
           // <-- some code here
        }
    }.invokeOnCompletion { throwable ->
        // last operation after done with everything here: 
        pendingAsyncResult.finish()
    }
    //done right away here, and yet the handling will be done one after another, freely
}

runInterruptible内部,我可以通过调用runBlocking(mainScope.coroutineContext) {}来访问UI线程,甚至在内部也可以使用cancel()取消任务。
使用runBlocking很重要,因为我需要在那里等待结果。当然,当有意义的时候,我可以使用替代方案,但是我也可以使用一个简单的处理程序,因为我不等待结果。
我还使用backgroundWorkDispatcher来确保所有的后台操作都在一个线程上,以等待下一个操作,一个接一个。

问题

这种解决方案的替代方案是什么?更优雅和/或更短的?更正式的?
注意,我需要处理UI一个接一个排队的操作,每个操作都等待前一个操作完成。BroadcastReceiver只是一个例子。我确信在代码中有(可悲的)更难修复的地方,但我想首先知道如何正确处理这个问题。

unhi4e5o

unhi4e5o1#

由于您在对另一个问题的评论中询问了线程队列,下面是我如何做协程作业队列。请记住,这是如果您需要每个提交的协程完全按顺序运行(根本没有并行工作),我不确定您上面所描述的是什么。

class JobQueue {
    private val scope = MainScope()
    private val queue = Channel<Job>(Channel.UNLIMITED)

    init { 
        scope.launch(Dispatchers.Default) {
            for (job in queue) job.join()
        }
    }

    fun submit(
        context: CoroutineContext = EmptyCoroutineContext,
        block: suspend CoroutineScope.() -> Unit
    ) {
        synchronized {
            val job = scope.launch(context, CoroutineStart.LAZY, block)
            queue.trySend(job)
        }
    }

    fun cancel() {
        queue.cancel()
        scope.cancel()
    }
}

您可以在object中或在顶层创建此类的示例,以使其持续应用的生命周期。这取决于您需要作业运行多长时间。我没有太多的BroadcastReceiver经验,但我知道它们是短暂的,所以如果他们在你的应用程序关闭屏幕时收到了一些东西,而协同程序需要的时间超过几秒钟,我不确定到底会发生什么。对于这类工作,我认为你需要迅速把它交给一个工作经理。但是如果你在应用程序显示在屏幕上的时候做一些事情,你可以使用协程。
以下操作将防止已提交作业的任何部分在任何以前提交到同一JobQueue示例的作业之前运行。

val jobQueue = JobQueue() // at top level so shared by all BroadcastReceivers

//...

override fun onReceive(someContext: Context, intent: Intent) {
    jobQueue.submit {
        val x = getSomething(someContext.applicationContext) // on main thread
        val y = withContext(Dispatchers.IO) {
            doSomeBlockingFetch() // not on main thread so safe to call blocking fun
        }
        doSomethingWithResult() // on main thread
    }
    // onReceive returns promptly on the main thread as required, but the JobQueue
    // prevents subsequent queue-submitted jobs from running before this one
    // is *completely* finished, including the final doSomethingWithResult() call
    // on the main thread.
}

关于问题中的代码:
创建单线程调度器可以防止使用该调度器的代码并行运行,而这可能正是您所希望的。但它不创建队列,也不保证执行顺序。假设我上面的例子是用你的解决方案完成的。操作系统快速连续地对onReceive进行了两次调用。doSomeBlockingFetch()部分不会使用单线程调度器并行运行,但不能保证它们将以何种顺序被调用,也不能保证随后的doSomethingWithResult()将以何种顺序被调用。
如果你想要一种不那么笨拙的方法来防止你的阻塞代码并行运行,并且如果你不关心IO后主线程工作的执行顺序,我会使用互斥体而不是单线程调度器:

val receiverIOMutex = Mutex() // at top level so shared by all BroadcastReceivers

//...

override fun onReceive(someContext: Context, intent: Intent) {
    anyCoroutineScope.launch(Dispatchers.Main.immediate) {
        val x = getSomething(someContext.applicationContext) // on main thread
        val y = receiverIOMutex.withLock {
            withContext(Dispatchers.IO) {
                doSomeBlockingFetch() // not on main thread so safe to call blocking fun
            }
        }
        doSomethingWithResult() // on main thread
    }
}

下面是一个带有热SharedFlow的Job Queue类的示例,因为您需要它,但这将是一个奇怪的选择。当已经有Channels时,SharedFlow被添加到Kotlin的全部原因是为多个订阅者提供一种无需消费就可以获取值的方法,而不是每个值只允许消费一次,无论谁在阅读它。但是对于一个作业队列,我们不希望有多个订阅者,我们只希望每个作业使用一次。因此,如果您使用SharedFlow执行此操作,就像使用扳手敲钉子一样。它会起作用,但不像用锤子那么优雅,而且有更多的误用或事故的风险。

class JobQueue {
    private val scope = MainScope()
    private val queue = MutableSharedFlow<Job>(extraBufferCapacity = Int.MAX_VALUE)

    init { 
        queue.onEach { it.join() }
            .flowOn(Dispatchers.Default)
            .launchIn(scope)
    }

    fun submit(
        context: CoroutineContext = EmptyCoroutineContext,
        block: suspend CoroutineScope.() -> Unit
    ) {
        synchronized {
            val job = scope.launch(context, CoroutineStart.LAZY, block)
            queue.tryEmit(job)
        }
    }

    fun cancel() {
        scope.cancel()
    }
}
x759pob2

x759pob22#

我还使用backgroundWorkDispatcher来确保所有后台操作都在单个线程上,以等待下一个操作,一个接一个。
这不是协程世界中单个线程所强制执行的。单个线程阻止了 * 并行性 *,但并不阻止 * 并发性 *。如果在单线程调度器中执行launch 2个协程,那么第二个协程很可能在第一个协程结束之前开始,假设第一个协程至少有一个挂起点(调用挂起函数)。这种协同程序执行的交错就是并发的含义。参见this other question,它源于相同的误解。
基本上,使用launchasync协程构建器(或者更一般地,启动多个协程)表示这些代码片段之间的并发性。虽然可以设计一个自定义分派器,强制一个协程在下一个协程开始之前完成,但这与协程的预期相反。
现在,这并不意味着你不能在你的情况下做任何事情,它只是意味着每个事件启动一个协程可能不是正确的事情(也许这是好的,虽然,请参阅下面的编辑)。相反,我建议创建一个channel来表示要处理的事件队列,并在某个地方启动一个协程来处理这些事件(因此它正确地表示您不需要并发)。在onReceive中,每个事件不启动一个协程,而只是发送到通道(在您的情况下可能使用sendBlocking,因为onReceive不是suspend)。
现在,关于在哪里启动“演员”协程,我想说这取决于。您可以为通道和协程指定您想要的作用域(我的意思是“作用域”是指变量可见性范围,而不是协程)。例如,如果你只想在这个特定的BroadcastReceiver的事件之间强制非并发性,我会说将通道声明为这个广播接收器的属性,并在初始化时启动协程(例如:在init块中),该CoroutineScope的作用域为广播接收器的生命周期(如果没有lifecycleScope,请自己创建作用域,并在销毁BroadcastReceiver时取消它)。
在onReceive回调中,有些调用应该在后台线程上完成,有些应该在UI线程上完成。有时候有些情况下两者都有。
这不是一个大问题。协程通过使用withContext(someDispatcher)使线程之间的切换变得容易。请注意,withContext在返回之前会等待lambda中的任何代码。这意味着即使在线程之间切换,代码仍然是顺序的。同步已为您处理。
下面是一些关于它外观的示例代码:

data class MyEvent(
    ... // add properties containing the data you need for processing
    val pendingResult: PendingResult,
)

// in the broadcast receiver
class MyBroadcastReceiver : BroadcastReceiver() {

    private val eventProcessorScope = MainScope(CoroutineName("my-receiver-event-processor"))

    private val events = Channel<MyEvent>()

    init {
        eventProcessorScope.launch {
            for (e in events) {
                try {
                    process(e)
                } finally {
                    e.pendingResult.finish()
                }
            }
        }
    }

    @UiThread
    override fun onReceive(someContext: Context, intent: Intent) {
        val pendingResult = goAsync()
        // extract the necessary data from the context or intent
        val event = YourEvent(..., pendingResult)
        eventChannel.sendBlocking(event)
    }

    suspend fun process(event: MyEvent) {
        // process the event here

        // do something on UI thread

        withContext(Dispatchers.Default) {
            // do some CPU-bound work
        }

        // do something else on UI thread after CPU work

        withContext(Dispatchers.IO) {
            // do some blocking call
        }
    }

    fun close() {
        eventProcessorScope.cancel()
    }
}

编辑:从goAsync的文档中,我意识到使事件处理异步实际上会阻止接收其他事件,我假设这会阻止对onReceive的并发调用:
请记住,您在这里所做的工作将阻止进一步的广播,直到它完成
这意味着您实际上可以启动任意多个协程来处理这些事件,只要您在处理结束时完成goAsync未决结果即可。但是不需要runBlocking/runInterruptible,只需要在这里和那里使用withContext就可以了。

相关问题