如何在flink中增加sinkfunction的numrecordsoutput度量?

lnlaulya  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(403)

我在用Flink消费Kafka,给雷迪斯写信。
这是我对redis的sink函数:

.addSink(new RichSinkFunction<MobilePageEvent>() {

                @Override
                public void invoke(MobilePageEvent event, Context context) {

                    JEDIS_CLUSTER.zadd(..);
                }
            })
            .name("redis sink");

虽然我可以从redis命令行获取数据,但是指标显示sink函数的输出为零:

如何增加这些指标?

qvsjd97n

qvsjd97n1#

numrecordsin和numrecordsout度量只计算flink作业本身中的流记录,不包括与外部系统的通信。所以换句话说,源不报告任何传入的记录,汇也不报告任何传出的记录。
在我看来,你有几个选择:
使用Flume上的numrecordsin度量作为您想要知道的近似值
分叉或扩展redissink并添加所需的度量
这里显示了添加计数器度量的模式。
对于redis sink,您可以在open()方法中初始化计数器,并在invoke()中递增它。但这似乎毫无意义,因为这只是反映numrecordsin指标。如果您的redis接收器正在进行缓冲的批量写入,那么在数据实际发送到redis之前,等待增量度量可能更有意义——在这种情况下,您可能更愿意使用计数表而不是计数器。

相关问题