topologyBuilder.setSpout("file_checking_spout", new FileCheckingSpout(myMonitoredFile));
topologyBuilder.setBolt("my_bolt", new MyBolt())
.shuffleGrouping("file_checking_spout")
.shuffleGrouping("whatever other grouping you need");
喷口将进行监控。如果只有一个要监视的文件,则可以发出空元组作为通知:
public class FileCheckingSpout extends BaseRichSpout {
@Override
public void nextTuple() {
Thread.sleep(500);
if (fileChanged()) { // check e.g. file modified timestamp
collector.emit(new Values());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields());
}
// ...
}
1条答案
按热度按时间dvtswwa31#
解决问题的一种方法是定义一个喷口,定期检查文件是否更改。一旦它这样做了,它会发送一个元组通知您的螺栓有关的变化。螺栓会反过来重新加载文件。以下是一些关于实现的提示:
拓扑将包含新的监视喷口。您的bolt将订阅它的流以及它所需的任何其他流(bolt可以使用多个流):
喷口将进行监控。如果只有一个要监视的文件,则可以发出空元组作为通知:
您的bolt现在必须接受有关文件重新加载的通知。它可以区分通知元组,例如使用
tuple.getSourceComponent()
:你也可以简单地检查一下文件是否在你的bolt中发生了变化
nextTuple()
. 上面描述的方式更像是“风暴方式”,因为它分离了关注点,并且重新加载不依赖于任何其他流。ps:当然,只要文件可以从spout和bolt访问,就可以工作,也就是说,如果您在集群中运行,它应该在共享文件系统上。