我在用Flink消费Kafka,给雷迪斯写信。
这是我对redis的sink函数:
.addSink(new RichSinkFunction<MobilePageEvent>() {
@Override
public void invoke(MobilePageEvent event, Context context) {
JEDIS_CLUSTER.zadd(..);
}
})
.name("redis sink");
虽然我可以从redis命令行获取数据,但是指标显示sink函数的输出为零:
如何增加这些指标?
1条答案
按热度按时间qvsjd97n1#
numrecordsin和numrecordsout度量只计算flink作业本身中的流记录,不包括与外部系统的通信。所以换句话说,源不报告任何传入的记录,汇也不报告任何传出的记录。
在我看来,你有几个选择:
使用Flume上的numrecordsin度量作为您想要知道的近似值
分叉或扩展redissink并添加所需的度量
这里显示了添加计数器度量的模式。
对于redis sink,您可以在open()方法中初始化计数器,并在invoke()中递增它。但这似乎毫无意义,因为这只是反映numrecordsin指标。如果您的redis接收器正在进行缓冲的批量写入,那么在数据实际发送到redis之前,等待增量度量可能更有意义——在这种情况下,您可能更愿意使用计数表而不是计数器。