不使用pojo的kafka avro反序列化程序

6tr1vspr  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(369)

我使用avro模式文件和生成的java源代码编写了kafka avro反序列化程序。要求是不要使用pojo的。如何使下面的代码不使用pojo和泛型模式转换。

import java.util.Arrays;
    import java.util.Map;
    import org.apache.avro.generic.GenericDatumReader;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.avro.io.DatumReader;
    import org.apache.avro.io.Decoder;
    import org.apache.avro.io.DecoderFactory;
    import org.apache.kafka.common.errors.SerializationException;
    import org.apache.kafka.common.serialization.Deserializer;
    import com.example.org.model.Person;

    public class AvroDeserializer implements Deserializer<GenericRecord> {

     @Override
     public void close() {

     }

     @Override
     public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public GenericRecord deserialize(String topic, byte[] data) {
    try {
      GenericRecord result = null;

      if (data != null) {
        DatumReader<Person> reader = new SpecificDatumReader<> 
     (Person.getSchema());

        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
        result = (GenericRecord) reader.read(null, decoder);
      }
      return result;
     } catch (Exception ex) {
      throw new SerializationException(
      "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
    }
  }
    }

如何使此代码不使用pojo的。

qv7cva1a

qv7cva1a1#

有几种方法可以做到这一点。您可以将其添加到构造函数:

protected final Class<T> targetType;

public AvroDeserializer(Class<T> targetType) {
    this.targetType = targetType;
}

并使用targettype反序列化:

SpecificDatumReader<GenericRecord> datumReader =
            new SpecificDatumReader<>(targetType.newInstance().getSchema());

然后,从使用反序列化程序的客户端:

AvroEmbeddedDeserializer<Test> avroEmbeddedDeserializer = new AvroEmbeddedDeserializer<>(Test.class);

final KafkaConsumer<String, Test> consumer = new KafkaConsumer<>(props, stringDeserializer, avroEmbeddedDeserializer);

注意:使用这种方法,您不能使用反序列化属性来配置使用者,因为它使用空构造函数。

hs1rzwqc

hs1rzwqc2#

如果您在serilizer中使用pojo类,那么您将存储模式和数据,这将导致解析消息的速度变慢,并在存储级别占用额外的空间。你必须改变你的生活 Serilizer 以及 DeSerilizer .
为了解决这个问题,使用了schema registry。

schema registry的基本思想是生产者/消费者在向主题读写数据时将引用avro模式。
我们不想像您暗示的那样为每个数据编写模式—通常,模式比数据大!这将浪费每次读取时解析它的时间,并且浪费资源(网络、磁盘、cpu)
我将建议您通过下面的链接查看代码以及对这个主题的详细描述。
https://blog.cloudera.com/blog/2018/07/robust-message-serialization-in-apache-kafka-using-apache-avro-part-1/

相关问题