storm watch有什么服务吗?

q8l4jmvw  于 2021-06-21  发布在  Storm
关注(0)|答案(1)|浏览(180)

我有一个螺栓,输入文件不断更新。但我无法获取更新的内容,因为我正在从中读取文件 prepare() 方法。我想在不停止或终止拓扑的情况下获取更新的文件。有没有像Storm中的守望服务这样的服务?或者有什么不同的方法?

dvtswwa3

dvtswwa31#

解决问题的一种方法是定义一个喷口,定期检查文件是否更改。一旦它这样做了,它会发送一个元组通知您的螺栓有关的变化。螺栓会反过来重新加载文件。以下是一些关于实现的提示:
拓扑将包含新的监视喷口。您的bolt将订阅它的流以及它所需的任何其他流(bolt可以使用多个流):

topologyBuilder.setSpout("file_checking_spout", new FileCheckingSpout(myMonitoredFile));
topologyBuilder.setBolt("my_bolt", new MyBolt())
    .shuffleGrouping("file_checking_spout")
    .shuffleGrouping("whatever other grouping you need");

喷口将进行监控。如果只有一个要监视的文件,则可以发出空元组作为通知:

public class FileCheckingSpout extends BaseRichSpout {
    @Override
    public void nextTuple() {
        Thread.sleep(500);
        if (fileChanged()) { // check e.g. file modified timestamp
            collector.emit(new Values());
        }
    }

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields());
    }
    // ...
}

您的bolt现在必须接受有关文件重新加载的通知。它可以区分通知元组,例如使用 tuple.getSourceComponent() :

class MyBolt implements IRichBolt {
    @Override 
    public void execute(Tuple tuple) {
        if ("file_checking_spout".equals(tuple.getSourceComponent())) {
             reloadFile();
             return;
        }
        // normal processing
    }
    //...
}

你也可以简单地检查一下文件是否在你的bolt中发生了变化 nextTuple() . 上面描述的方式更像是“风暴方式”,因为它分离了关注点,并且重新加载不依赖于任何其他流。
ps:当然,只要文件可以从spout和bolt访问,就可以工作,也就是说,如果您在集群中运行,它应该在共享文件系统上。

相关问题