我很好地收集了从apache storm到graphite的数据。然后我开发了一个定制的调度器来实现ischeduler接口,现在我无法收集任何度量。
这是我的日程安排:
public class SiteAwareScheduler implements IScheduler {
@Override
public void prepare(Map conf) {
}
@Override
public void schedule(Topologies topologies, Cluster cluster) {
....
}
}
在storm.yaml文件中:
supervisor.scheduler.meta:
site: "cluster"
storm.scheduler: "org.sense.storm.scheduler.SiteAwareScheduler"
我使用这个库来收集度量并将其发送到graphite服务器。
public class MyTopology {
config.put(YammerFacadeMetric.FACADE_METRIC_TIME_BUCKET_IN_SEC, 30);
config.put(SimpleGraphiteStormMetricProcessor.GRAPHITE_HOST, "127.0.0.1");
config.put(SimpleGraphiteStormMetricProcessor.GRAPHITE_PORT, 2003);
config.put(SimpleGraphiteStormMetricProcessor.REPORT_PERIOD_IN_SEC, 10);
config.put(Config.TOPOLOGY_NAME, "MqttSensorSumTopology");
config.registerMetricsConsumer(MetricReporter.class, new MetricReporterConfig(".*", SimpleGraphiteStormMetricProcessor.class.getCanonicalName()), 1);
...
TopologyBuilder topologyBuilder = new TopologyBuilder();
...
topologyBuilder.setBolt(MqttSensors.BOLT_SENSOR_TICKET_SUM.getValue(), new SumSensorValuesWindowBolt(SensorType.COUNTER_TICKETS).withTumblingWindow(Duration.seconds(5)), 1)
.shuffleGrouping(MqttSensors.SPOUT_STATION_01_TICKETS.getValue())
.addConfiguration(TagSite.SITE.getValue(), TagSite.CLUSTER.getValue());
}
public class SumSensorValuesWindowBolt extends BaseWindowedBolt {
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
StormYammerMetricsAdapter.configure(stormConf, context, new MetricsRegistry());
this.collector = collector;
}
public void execute(TupleWindow inputWindow) {
...
}
}
ischeduler的“prepare”方法没有topologycontext,所以我不知道在哪里用新的时间表示例化我的度量。
有什么提示吗?谢谢菲利佩
1条答案
按热度按时间rkttyhzu1#
我在这个链接上安装了石墨。然后我在这两台机器上配置了文件storm.yaml,正如它在这个链接中所说的那样。主人和工人。
然后,这些度量将以我的拓扑的名称出现在graphite服务器上
MqttSensorSumTopology-1-1553506290
.