@Service
class CalculationBatcher(
private val commandGateway: CommandGateway
) {
private val calculationQueue = LinkedList<Any>()
fun queueCalculation(calculation: Any) {
calculationQueue.add(calculation)
}
@Scheduled(fixedRate = 10000) // Send every 10 seconds
@PreDestroy // When destroying the application, send remaining events
fun sendCalculations() {
// Use pop here on the LinkedList while having items to prevent threading issues
val calculationsToSend = LinkedList<Any>()
while (calculationQueue.isNotEmpty()) {
calculationsToSend.push(calculationQueue.pop())
}
commandGateway.sendAndWait<Any>(MyEventsCommand(calculationsToSend), 10, TimeUnit.SECONDS)
}
data class MyEventsCommand(val events: List<Any>)
}
1条答案
按热度按时间iibxawm41#
这里为什么要有一个等待期?
如果原因是在Axon Framework处理完命令中的所有数据之前等待,则可以改用
commandGateway.sendAndWait(Object command, ...)
。这将使当前线程等待命令执行完毕。如果它是一种批处理数据的机制,我建议在内存中保留一个
List
来对项目进行排队,然后使用Spring调度机制每10秒发送一个命令。我希望这能有所帮助。如果原因是别的,让我知道!