使用更改模式反序列化

toe95027  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(397)

基于avro模式,我生成了一个类(data)来处理适合该模式的类,然后我对数据进行编码并使用kafka发送到其他应用程序“a”

Data data; // <- The object was initialized before . Here it is only the declaration "for example"
EncoderFactory encoderFactory = EncoderFactory.get();
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        BinaryEncoder encoder = encoderFactory. directBinaryEncoder(out, null);                    
        DatumWriter<Tloog> writer;                  
        writer = new SpecificDatumWriter<Data>( Data.class);
        writer.write(data, encoder);
        byte[] avroByteMessage = out.toByteArray();

另一方面(在应用程序“a”中),我通过实现反序列化器来反序列化数据

class DataDeserializer implements Deserializer<Data> {
    private String encoding = "UTF8";

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // nothing to do
    }

    @Override
    public Tloog deserialize(String topic, byte[] data) {
        try {
            if (data == null)
            {
                return null;
            }
            else
            {
                        DatumReader<Tloog> reader = new SpecificDatumReader<Data>( Data.class);
                        DecoderFactory decoderFactory = DecoderFactory.get();
                        BinaryDecoder decoder = decoderFactory.binaryDecoder( data, null);
                        Data decoded = reader.read(null, decoder);
                        return decoded;
            }
        } catch (Exception e) {
            throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
        }
    }

问题是这种方法需要使用specificdatumreader,即数据类应该与应用程序代码集成…这可能有问题-架构可能会更改,因此应该重新生成数据类并再次集成2个问题:
我应该在应用程序中使用genericdatumreader吗?如何正确地做到这一点(我可以在应用程序中简单地保存模式)
如果数据发生变化,有没有一种简单的方法来处理specificdatumreader?它怎么能集成而不出很多麻烦呢?
谢谢

uyhoqukh

uyhoqukh1#

我用 GenericDatumReader --好吧,实际上我是从它派生出我的reader类的,但是你明白我的意思了。为了使用它,我把我的模式放在一个特别的Kafka主题里-- Schema 令人惊讶的是。使用者和生产者在启动时都会阅读本主题并配置各自的解析器。
如果您这样做,您甚至可以让您的消费者和生产者动态地更新他们的模式,而不必重新启动它们。这是我的设计目标——我不想为了添加或更改模式而重新启动应用程序。这就是为什么 SpecificDatumReader 对我不起作用,老实说我为什么用 Avro 而不是像这样 Thrift .
更新
执行avro的正常方法是将模式与记录一起存储在文件中。我不这样做,主要是因为我不能。我用 Kafka ,所以我不能直接将模式与数据一起存储--我必须将模式存储在单独的主题中。
我这样做,首先我加载我所有的模式。你可以从文本文件中读取它们;但就像我说的,我是从一个 Kafka 主题。在我读了Kafka的作品后,我有一个这样的数组:

val schemaArray: Array[String] = Array(
  """{"name":"MyObj","type":"record","fields":[...]}""",
  """{"name":"MyOtherObj","type":"record","fields":[...]}"""
)

为这件事道歉 Scala 顺便说一句,但这是我得到的。
无论如何,您需要创建一个解析器,然后foreach schema,解析它,创建reader和writer,并将它们保存到maps:

val parser = new Schema.Parser()
val schemas = Map(schemaArray.map{s => parser.parse(s)}.map(s => (s.getName, s)):_*)
val readers = schemas.map(s => (s._1, new GenericDatumReader[GenericRecord](s._2)))
val writers = schemas.map(s => (s._1, new GenericDatumWriter[GenericRecord](s._2)))
var decoder: BinaryDecoder = null

我在解析一个实际记录之前做了所有这些——这只是为了配置解析器。然后,要解码一个单独的记录,我会做:

val byteArray: Array[Byte] = ... // <-- Avro encoded record
val schemaName: String = ... // <-- name of the Avro schema

val reader = readers.get(schemaName).get

decoder = DecoderFactory.get.binaryDecoder(byteArray, decoder)
val record = reader.read(null, decoder)

相关问题