我使用avro模式文件和生成的java源代码编写了kafka avro反序列化程序。要求是不要使用pojo的。如何使下面的代码不使用pojo和泛型模式转换。
import java.util.Arrays;
import java.util.Map;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import com.example.org.model.Person;
public class AvroDeserializer implements Deserializer<GenericRecord> {
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public GenericRecord deserialize(String topic, byte[] data) {
try {
GenericRecord result = null;
if (data != null) {
DatumReader<Person> reader = new SpecificDatumReader<>
(Person.getSchema());
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
result = (GenericRecord) reader.read(null, decoder);
}
return result;
} catch (Exception ex) {
throw new SerializationException(
"Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
}
}
}
如何使此代码不使用pojo的。
2条答案
按热度按时间qv7cva1a1#
有几种方法可以做到这一点。您可以将其添加到构造函数:
并使用targettype反序列化:
然后,从使用反序列化程序的客户端:
注意:使用这种方法,您不能使用反序列化属性来配置使用者,因为它使用空构造函数。
hs1rzwqc2#
如果您在serilizer中使用pojo类,那么您将存储模式和数据,这将导致解析消息的速度变慢,并在存储级别占用额外的空间。你必须改变你的生活
Serilizer
以及DeSerilizer
.为了解决这个问题,使用了schema registry。
schema registry的基本思想是生产者/消费者在向主题读写数据时将引用avro模式。
我们不想像您暗示的那样为每个数据编写模式—通常,模式比数据大!这将浪费每次读取时解析它的时间,并且浪费资源(网络、磁盘、cpu)
我将建议您通过下面的链接查看代码以及对这个主题的详细描述。
https://blog.cloudera.com/blog/2018/07/robust-message-serialization-in-apache-kafka-using-apache-avro-part-1/