我在Kafka有一个主题,在那里我得到了json格式的多种类型的事件。我创建了一个filestreamsink,用bucketing将这些事件写入s3。
FlinkKafkaConsumer errorTopicConsumer = new FlinkKafkaConsumer(ERROR_KAFKA_TOPICS,
new SimpleStringSchema(),
properties);
final StreamingFileSink<Object> errorSink = StreamingFileSink
.forRowFormat(new Path(outputPath + "/error"), new SimpleStringEncoder<>("UTF-8"))
.withBucketAssigner(new EventTimeBucketAssignerJson())
.build();
env.addSource(errorTopicConsumer)
.name("error_source")
.setParallelism(1)
.addSink(errorSink)
.name("error_sink").setParallelism(1);
public class EventTimeBucketAssignerJson implements BucketAssigner<Object, String> {
@Override
public String getBucketId(Object record, Context context) {
StringBuffer partitionString = new StringBuffer();
Tuple3<String, Long, String> tuple3 = (Tuple3<String, Long, String>) record;
try {
partitionString.append("event_name=")
.append(tuple3.f0).append("/");
String timePartition = TimeUtils.getEventTimeDayPartition(tuple3.f1);
partitionString.append(timePartition);
} catch (Exception e) {
partitionString.append("year=").append(Constants.DEFAULT_YEAR).append("/")
.append("month=").append(Constants.DEFAULT_MONTH).append("/")
.append("day=").append(Constants.DEFAULT_DAY);
}
return partitionString.toString();
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
}
现在我想把每个事件的每小时计数作为度量标准发布给普罗米修斯,并在上面发布一个grafana Jmeter 板。
所以,请帮助我如何实现每一个事件的小时计数使用Flink指标和发布到普罗米修斯。
谢谢
1条答案
按热度按时间pqwbnv8z1#
通常,只需为请求创建一个计数器,然后使用
rate()
在普罗米修斯函数中,这将给出给定时间内请求的速率。然而,如果你出于某种原因想自己做这件事,那么你可以做一些类似于过去所做的事情
org.apache.kafka.common.metrics.stats.Rate
. 因此,在这种情况下,您需要收集样本列表,其中包含采集样本的时间,以及用于计算速率的窗口大小,然后您可以简单地进行计算,即删除超出范围且已过期的样本,然后简单地计算窗口中的样本数。然后你可以设置
Gauge
到计算值。