flink statsd度量

ergxz8rk  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(228)

我看不到从Flink寄来的统计数字。已经将flink-metrics-statsd-1.4.1.jar放在jm和tms/lib文件夹中。使用下面的代码,我希望将“mycounter”看作一个xdb系列。但是在xdb里什么都没有,所以没有显示出ingrafana。有什么建议吗?谢谢!

public class StatsdCounter extends RichMapFunction<String, String> {
    private transient Counter counter;

    @Override
    public void open(Configuration config) {
        this.counter = getRuntimeContext()
        .getMetricGroup()
        .counter("myCounter");
    }

    @Override
    public String map(String value) throws Exception {
        this.counter.inc();
        return value;
}

在flink-conf.yaml中,

metrics.scope.jm: <host>.jobmanager
metrics.scope.jm.job: <host>.jobmanager.<job_name>
metrics.scope.tm: <host>.taskmanager.<tm_id>
metrics.scope.tm.job: <host>.taskmanager.<tm_id>.<job_name>
metrics.scope.task: <host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>
metrics.scope.operator: <host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>

metrics.reporters: stsd
metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
metrics.reporter.stsd.host: 192.168.56.10
metrics.reporter.stsd.port: 8127
metrics.reporter.stsd.interval: 1 SECONDS

但是如果我们在代码中手动设置statsd,

public void open(Configuration configuration) throws Exception {
    super.open(configuration);
    counter = new AtomicInteger(); 
    stats = new NonBlockingStatsDClient(xxx);
}

@Override
public String map(String value) throws Exception { 
  stats.increment(statsdAspect); 
  return value;
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题