我将多个元组发送到一个parelleism count为8的bolt并处理,然后我想发出id,索引到下一个bolt,该bolt验证是否所有元组都已处理并发送通知。怎么做?storm已经有工具了吗?
wxclj1h51#
storm已经有了一个消息保证系统。您需要通过扩展basicbolt和basicspout类(默认情况下会确认每个元组)或显式调用outputcollector类的ack方法在bolt中启用此功能。然后,您需要将逻辑添加到喷口的fail和ack方法中。消息保证api的更多细节在这里。如果一个元组(由喷口发出)无法通过您的拓扑结构,这个系统将通知您。如果您想通过执行者之间的每一步了解成功,您要么需要在bolt中使用自定义逻辑,要么修改acker实现(不推荐)。但是,如果保证每个消息只经过一次处理,那么您可能应该看看trident库。
1条答案
按热度按时间wxclj1h51#
storm已经有了一个消息保证系统。您需要通过扩展basicbolt和basicspout类(默认情况下会确认每个元组)或显式调用outputcollector类的ack方法在bolt中启用此功能。
然后,您需要将逻辑添加到喷口的fail和ack方法中。消息保证api的更多细节在这里。
如果一个元组(由喷口发出)无法通过您的拓扑结构,这个系统将通知您。如果您想通过执行者之间的每一步了解成功,您要么需要在bolt中使用自定义逻辑,要么修改acker实现(不推荐)。
但是,如果保证每个消息只经过一次处理,那么您可能应该看看trident库。