rabbitmq 在RabbitListener中发送命令后等待10秒

l5tcr1uw  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(387)

我有我的RabbitListener女巫收到的计算结果。这是大量的数据,所以我批处理它,然后发送一个命令到axon commandGateway。我想知道是否有可能为侦听器等待几秒钟,然后发送下一个命令。我注意到,我不能在这里使用threadSleep。它可以像这样为每个批处理工作的方法

private void myMethod() {
  commandGateway.send(batchedData) 
   //wait 10seconds
iibxawm4

iibxawm41#

这里为什么要有一个等待期?
如果原因是在Axon Framework处理完命令中的所有数据之前等待,则可以改用commandGateway.sendAndWait(Object command, ...)。这将使当前线程等待命令执行完毕。
如果它是一种批处理数据的机制,我建议在内存中保留一个List来对项目进行排队,然后使用Spring调度机制每10秒发送一个命令。

@Service
class CalculationBatcher(
    private val commandGateway: CommandGateway
) {
    private val calculationQueue = LinkedList<Any>()

    fun queueCalculation(calculation: Any) {
        calculationQueue.add(calculation)
    }

    @Scheduled(fixedRate = 10000) // Send every 10 seconds
    @PreDestroy // When destroying the application, send remaining events
    fun sendCalculations() {
        // Use pop here on the LinkedList while having items to prevent threading issues
        val calculationsToSend = LinkedList<Any>()
        while (calculationQueue.isNotEmpty()) {
            calculationsToSend.push(calculationQueue.pop())
        }
        commandGateway.sendAndWait<Any>(MyEventsCommand(calculationsToSend), 10, TimeUnit.SECONDS)
    }

    data class MyEventsCommand(val events: List<Any>)
}

我希望这能有所帮助。如果原因是别的,让我知道!

相关问题