我需要向集群中运行的flink作业添加track和span id,请求流如下所示
用户-->rest api->kafka-topic-1-->flinkjob-1-->kafka-topic-2-->flinkjob-2-->使用者-->数据库
我使用SpringBoot创建RESTAPI,并使用SpringSleuth向生成的日志添加track和span id,track和span id是在调用restapi和将消息放在kakfa-topic-1上时添加的,但是我不知道如何在使用flinkjob-1和flinkjob-2上的消息时添加track和span id,因为它们不在spring上下文中。
一种方法是将track和span id设置为kafka消息头,让kafka消费者/生产者拦截器提取并记录track和span id,我尝试了这个方法,但是我的拦截器没有被调用,因为flink api使用的是flink版本的kafka客户机。
无法调用我的自定义kafkadeserializationschema
public class MyDeserializationSchema implements KafkaDeserializationSchema<String> {
private static final Logger LOGGER = LoggerFactory.getLogger(MyDeserializationSchema.class);
@Override
public TypeInformation<String> getProducedType() {
System.out.println("**************Invoked 1");
LOGGER.debug("**************Invoked 1");
return null;
}
@Override
public boolean isEndOfStream(String nextElement) {
System.out.println("**************Invoked 2");
LOGGER.debug("**************Invoked 2");
return true;
}
@Override
public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
System.out.println("**************Invoked 3");
LOGGER.debug("**************Invoked 3");
return record.toString();
}
}
有没有人能建议我如何做到这一点。
2条答案
按热度按时间uajslkp61#
您在这里使用的是一个简单的字符串,在序列化字节到字符串时,可以执行如下代码所示的操作。
4xy9mtcn2#
也可以使用kafkadeserializationschema来获取头
为了访问kafka消息的键、值和元数据,kafkadeserializationschema具有以下反序列化方法t deserialize(consumerrecord record)。