基于avro模式,我生成了一个类(data)来处理适合该模式的类,然后我对数据进行编码并使用kafka发送到其他应用程序“a”
Data data; // <- The object was initialized before . Here it is only the declaration "for example"
EncoderFactory encoderFactory = EncoderFactory.get();
ByteArrayOutputStream out = new ByteArrayOutputStream();
BinaryEncoder encoder = encoderFactory. directBinaryEncoder(out, null);
DatumWriter<Tloog> writer;
writer = new SpecificDatumWriter<Data>( Data.class);
writer.write(data, encoder);
byte[] avroByteMessage = out.toByteArray();
另一方面(在应用程序“a”中),我通过实现反序列化器来反序列化数据
class DataDeserializer implements Deserializer<Data> {
private String encoding = "UTF8";
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// nothing to do
}
@Override
public Tloog deserialize(String topic, byte[] data) {
try {
if (data == null)
{
return null;
}
else
{
DatumReader<Tloog> reader = new SpecificDatumReader<Data>( Data.class);
DecoderFactory decoderFactory = DecoderFactory.get();
BinaryDecoder decoder = decoderFactory.binaryDecoder( data, null);
Data decoded = reader.read(null, decoder);
return decoded;
}
} catch (Exception e) {
throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
}
}
问题是这种方法需要使用specificdatumreader,即数据类应该与应用程序代码集成…这可能有问题-架构可能会更改,因此应该重新生成数据类并再次集成2个问题:
我应该在应用程序中使用genericdatumreader吗?如何正确地做到这一点(我可以在应用程序中简单地保存模式)
如果数据发生变化,有没有一种简单的方法来处理specificdatumreader?它怎么能集成而不出很多麻烦呢?
谢谢
1条答案
按热度按时间uyhoqukh1#
我用
GenericDatumReader
--好吧,实际上我是从它派生出我的reader类的,但是你明白我的意思了。为了使用它,我把我的模式放在一个特别的Kafka主题里--Schema
令人惊讶的是。使用者和生产者在启动时都会阅读本主题并配置各自的解析器。如果您这样做,您甚至可以让您的消费者和生产者动态地更新他们的模式,而不必重新启动它们。这是我的设计目标——我不想为了添加或更改模式而重新启动应用程序。这就是为什么
SpecificDatumReader
对我不起作用,老实说我为什么用Avro
而不是像这样Thrift
.更新
执行avro的正常方法是将模式与记录一起存储在文件中。我不这样做,主要是因为我不能。我用
Kafka
,所以我不能直接将模式与数据一起存储--我必须将模式存储在单独的主题中。我这样做,首先我加载我所有的模式。你可以从文本文件中读取它们;但就像我说的,我是从一个
Kafka
主题。在我读了Kafka的作品后,我有一个这样的数组:为这件事道歉
Scala
顺便说一句,但这是我得到的。无论如何,您需要创建一个解析器,然后foreach schema,解析它,创建reader和writer,并将它们保存到maps:
我在解析一个实际记录之前做了所有这些——这只是为了配置解析器。然后,要解码一个单独的记录,我会做: