我已经为apache flink编写了一个非常简单的java程序,现在我对测量吞吐量(每秒处理的元组数)和延迟(程序需要处理每个输入元组的时间)等统计数据很感兴趣。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.readTextFile("/home/LizardKing/Documents/Power/Prova.csv")
.map(new MyMapper().writeAsCsv("/home/LizardKing/Results.csv");
JobExecutionResult res = env.execute();
我知道flink暴露了一些指标:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/metrics.html
但是我不知道如何使用它们来获得我想要的。从这个链接中我读到一个“meter”可以用来测量平均吞吐量,但是在定义了它之后,我应该如何使用它呢?
1条答案
按热度按时间vhipe2zx1#
我们正在运行的定制指标,如米,在我们的生产流水作业在Yarn运行计量。
以下是步骤:
pom.xml的附加依赖关系
我们使用的是1.2.1版本
然后将meter添加到mymapper类。
希望这有帮助。