使用metric的事件flink计数

yhuiod9q  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(336)

我在Kafka有一个主题,在那里我得到了json格式的多种类型的事件。我创建了一个filestreamsink,用bucketing将这些事件写入s3。

FlinkKafkaConsumer errorTopicConsumer = new FlinkKafkaConsumer(ERROR_KAFKA_TOPICS,
                new SimpleStringSchema(),
                properties);
        final StreamingFileSink<Object> errorSink = StreamingFileSink
                .forRowFormat(new Path(outputPath + "/error"), new SimpleStringEncoder<>("UTF-8"))
                .withBucketAssigner(new EventTimeBucketAssignerJson())
                .build();

        env.addSource(errorTopicConsumer)
                .name("error_source")
                .setParallelism(1)
                .addSink(errorSink)
                .name("error_sink").setParallelism(1);
public class EventTimeBucketAssignerJson implements BucketAssigner<Object, String> {

    @Override
    public String getBucketId(Object record, Context context) {
        StringBuffer partitionString = new StringBuffer();
        Tuple3<String, Long, String> tuple3 = (Tuple3<String, Long, String>) record;
        try {
            partitionString.append("event_name=")
                    .append(tuple3.f0).append("/");

            String timePartition = TimeUtils.getEventTimeDayPartition(tuple3.f1);
            partitionString.append(timePartition);
        } catch (Exception e) {
            partitionString.append("year=").append(Constants.DEFAULT_YEAR).append("/")
                    .append("month=").append(Constants.DEFAULT_MONTH).append("/")
                    .append("day=").append(Constants.DEFAULT_DAY);
        }
        return partitionString.toString();
    }

    @Override
    public SimpleVersionedSerializer<String> getSerializer() {
        return SimpleVersionedStringSerializer.INSTANCE;
    }
}

现在我想把每个事件的每小时计数作为度量标准发布给普罗米修斯,并在上面发布一个grafana Jmeter 板。
所以,请帮助我如何实现每一个事件的小时计数使用Flink指标和发布到普罗米修斯。
谢谢

pqwbnv8z

pqwbnv8z1#

通常,只需为请求创建一个计数器,然后使用 rate() 在普罗米修斯函数中,这将给出给定时间内请求的速率。
然而,如果你出于某种原因想自己做这件事,那么你可以做一些类似于过去所做的事情 org.apache.kafka.common.metrics.stats.Rate . 因此,在这种情况下,您需要收集样本列表,其中包含采集样本的时间,以及用于计算速率的窗口大小,然后您可以简单地进行计算,即删除超出范围且已过期的样本,然后简单地计算窗口中的样本数。
然后你可以设置 Gauge 到计算值。

相关问题