使用Flink自定义avro消息反序列化

js5cn81o  于 2022-12-09  发布在  Apache
关注(0)|答案(2)|浏览(329)

我正在开发的Flink消费者应用程序从多个Kafka主题中读取。在不同主题中发布的消息遵循相同的模式(格式为Avro)。对于模式管理,我使用Confluent Schema Registry。
我一直在使用下面的代码片段作为KafkaSource,它工作得很好。

KafkaSource<MyObject> source = KafkaSource.<MyObject>builder()
                .setBootstrapServers(BOOTSTRAP_SERVERS)
                .setTopics(TOPIC-1, TOPIC-2)
                .setGroupId(GROUP_ID)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(ConfluentRegistryAvroDeserializationSchema.forSpecific(MyObject.class, SCHEMA_REGISTRY_URL))
                .build();

现在,我想确定我处理的每个消息的主题名称。由于当前的反序列化器是ValueOnly,我开始研究setDeserializer()方法,我觉得它可以给予我访问整个ConsumerRecord对象,我可以从中获取主题名称。
但是,我不知道如何使用该实现。我应该实现自己的反序列化程序吗?如果是,架构注册表如何适应该实现?

hmmo2u0o

hmmo2u0o1#

您可以将setDeserializer方法与KafkaRecordDeserializationSchema一起使用,KafkaRecordDeserializationSchema可能如下所示:

public class KafkaUsageRecordDeserializationSchema
        implements KafkaRecordDeserializationSchema<UsageRecord> {

    private static final long serialVersionUID = 1L;

    private transient ObjectMapper objectMapper;

    @Override
    public void open(DeserializationSchema.InitializationContext context) throws Exception {
        KafkaRecordDeserializationSchema.super.open(context);
        objectMapper = JsonMapper.builder().build();
    }

    @Override
    public void deserialize(
            ConsumerRecord<byte[], byte[]> consumerRecord,
            Collector<UsageRecord> collector) throws IOException {

        collector.collect(objectMapper.readValue(consumerRecord.value(), UsageRecord.class));
    }

    @Override
    public TypeInformation<UsageRecord> getProducedType() {
        return TypeInformation.of(UsageRecord.class);
    }
}

然后可以使用ConsumerRecord访问主题和其他元数据。

fxnxkyjh

fxnxkyjh2#

我从上面的答案(大卫的)中获得灵感,并添加了以下自定义反序列化器-

KafkaSource<MyObject> source = KafkaSource.<MyObject>builder()
          .setBootstrapServers(BOOTSTRAP_SERVERS)
          .setTopics(TOPIC-1, TOPIC-2)
          .setGroupId(GROUP_ID)
          .setStartingOffsets(OffsetsInitializer.earliest())
          .setDeserializer(KafkaRecordDeserializationSchema.of(new KafkaDeserializationSchema<Event>{                                                          
    
           DeserializationSchema deserialzationSchema = ConfluentRegistryAvroDeserializationSchema.forSpecific(MyObject.class, SCHEMA_REGISTRY_URL);

           @Override
           public boolean isEndOfStream(Event nextElement) {
              return false;
           }
        
           @Override
           public String deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
              Event event = new Event();
              event.setTopicName(record.topic());
              event.setMyObject((MyObject) deserializationSchema.deserialize(record.value()));
              return event;
           }
        
           @Override
           public TypeInformation<String> getProducedType() {
              return TypeInformation.of(Event.class);
           }
           })).build();

Event类是MyObject类上的 Package 器,具有用于存储主题名称的附加字段。

相关问题