使用Kotlin流在插入事件时处理事件

yc0p9oo0  于 2023-03-30  发布在  Kotlin
关注(0)|答案(1)|浏览(145)

我有这样一个管理器类,它处理这样的事件。

fun init() {
    job = CoroutineScope(ioDispatcher).launch {
        combine(
            networkMonitor.isOnline,
            fcmEventsRepository.getPendingFcmEvent()
        ) { isOnline, event ->
            isOnline to event
        }.collect { (isOnline, event) ->
            if (isOnline && event != null) {
                processEvent(event)
            }
        }
    }
}

我的流程事件函数如下所示

private suspend fun processEvent(event: FcmEvent) = withContext(ioDispatcher) {
        var success = false
        var retries = 0
        while (!success && isActive) {
            try {
                val isProcessed = fcmEventsRepository.isEventProcessed(event.id)
                if (isProcessed) return@withContext

                retryWithExponentialBackoff(
                    maxRetries = 5,
                    initialDelayMillis = 1000,
                    factor = 2,
                    action = {
                        syncRepository.syncByEventType(event.type)
                        fcmEventsRepository.updateFcmEventStatus(
                            event.id,
                            ProcessingStatus.Processed
                        )
                    }
                )
                success = true
            } catch (e: Exception) {
                Timber.e(e, "Failed to process event $event, retrying in 1 minute...")
                delay(eventCheckDelay)
                retries++
            }
        }
}

如何确保事件处理遵守以下内容

  • 一次只处理一个事件
  • 如果正在处理事件并且取消设备丢失连接处理
  • 如果网络状态发生变化,同一事件不会被多次处理
zour9fqk

zour9fqk1#

如果事件已经被处理,那么processEvent函数已经提前退出,因此如果连接状态丢失并重新获得,您无需担心意外地对同一事件调用processEvent多次。
当一个新的事件到来或者连接丢失时,它只会取消之前的处理,你所需要做的就是用collectLatest替换collect,当一个新的事件或者连接状态到来时,它会取消之前的处理,并再次运行collectLatest lambda。

相关问题