我正在开发的Flink消费者应用程序从多个Kafka主题中读取。在不同主题中发布的消息遵循相同的模式(格式为Avro)。对于模式管理,我使用Confluent Schema Registry。
我一直在使用下面的代码片段作为KafkaSource,它工作得很好。
KafkaSource<MyObject> source = KafkaSource.<MyObject>builder()
.setBootstrapServers(BOOTSTRAP_SERVERS)
.setTopics(TOPIC-1, TOPIC-2)
.setGroupId(GROUP_ID)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(ConfluentRegistryAvroDeserializationSchema.forSpecific(MyObject.class, SCHEMA_REGISTRY_URL))
.build();
现在,我想确定我处理的每个消息的主题名称。由于当前的反序列化器是ValueOnly,我开始研究setDeserializer()方法,我觉得它可以给予我访问整个ConsumerRecord对象,我可以从中获取主题名称。
但是,我不知道如何使用该实现。我应该实现自己的反序列化程序吗?如果是,架构注册表如何适应该实现?
2条答案
按热度按时间hmmo2u0o1#
您可以将
setDeserializer
方法与KafkaRecordDeserializationSchema
一起使用,KafkaRecordDeserializationSchema
可能如下所示:然后可以使用
ConsumerRecord
访问主题和其他元数据。fxnxkyjh2#
我从上面的答案(大卫的)中获得灵感,并添加了以下自定义反序列化器-
Event类是MyObject类上的 Package 器,具有用于存储主题名称的附加字段。