在我的storm拓扑中,在处理流时,我希望将某些消息的处理延迟到将来的某个时间点。有哪些合理的选择?
到目前为止,我想到了以下几点:
使用java的 Thread.sleep
. (然而,根据一些讨论,这并不是有效利用storm资源的推荐方法。)
使用延迟队列。。。
特别是,请尝试java.util.concurrent.delayqueue。
还有其他值得尝试的实现吗?
storm是否有延迟我忽略的消息的api?
zeromq是否提供了storm(如果修改)可以利用的延迟消息传递api?
在我的storm拓扑中,在处理流时,我希望将某些消息的处理延迟到将来的某个时间点。有哪些合理的选择?
到目前为止,我想到了以下几点:
使用java的 Thread.sleep
. (然而,根据一些讨论,这并不是有效利用storm资源的推荐方法。)
使用延迟队列。。。
特别是,请尝试java.util.concurrent.delayqueue。
还有其他值得尝试的实现吗?
storm是否有延迟我忽略的消息的api?
zeromq是否提供了storm(如果修改)可以利用的延迟消息传递api?
2条答案
按热度按时间1l5u6lss1#
我们使用拓扑记号元组来批量处理挂起的元组。它基本上只是将它们存储在每个普通元组的内存中,当它接收到一个tick元组时,它使用批量/流水线处理将它们处理到存储/索引中。
我们还使用redis来处理大量的卷尖峰,如果一个卷尖峰检测到所有元组,则重定向到每个主机上的本地redis存储,然后在卷关闭后被推回到拓扑处理中。我们的情况可能不适用于你,只适用于我。
14ifxucb2#
使用外部消息队列实现延时队列。
由于storm是容错的,并且是水平分布的,所以选择一个适合这种样式的消息队列是有意义的,例如:
Kafka
亚马逊sqs
兔子MQ