我有这样一个管理器类,它处理这样的事件。
fun init() {
job = CoroutineScope(ioDispatcher).launch {
combine(
networkMonitor.isOnline,
fcmEventsRepository.getPendingFcmEvent()
) { isOnline, event ->
isOnline to event
}.collect { (isOnline, event) ->
if (isOnline && event != null) {
processEvent(event)
}
}
}
}
我的流程事件函数如下所示
private suspend fun processEvent(event: FcmEvent) = withContext(ioDispatcher) {
var success = false
var retries = 0
while (!success && isActive) {
try {
val isProcessed = fcmEventsRepository.isEventProcessed(event.id)
if (isProcessed) return@withContext
retryWithExponentialBackoff(
maxRetries = 5,
initialDelayMillis = 1000,
factor = 2,
action = {
syncRepository.syncByEventType(event.type)
fcmEventsRepository.updateFcmEventStatus(
event.id,
ProcessingStatus.Processed
)
}
)
success = true
} catch (e: Exception) {
Timber.e(e, "Failed to process event $event, retrying in 1 minute...")
delay(eventCheckDelay)
retries++
}
}
}
如何确保事件处理遵守以下内容
- 一次只处理一个事件
- 如果正在处理事件并且取消设备丢失连接处理
- 如果网络状态发生变化,同一事件不会被多次处理
1条答案
按热度按时间zour9fqk1#
如果事件已经被处理,那么
processEvent
函数已经提前退出,因此如果连接状态丢失并重新获得,您无需担心意外地对同一事件调用processEvent
多次。当一个新的事件到来或者连接丢失时,它只会取消之前的处理,你所需要做的就是用
collectLatest
替换collect
,当一个新的事件或者连接状态到来时,它会取消之前的处理,并再次运行collectLatest
lambda。