我们使用apachekafka(不是合流的kafka)0.10。我们想与Kafka建立avro模式。我有如下avro模式。
{
"namespace": "Rule",
"type": "record",
"name": "RuleMessage",
"fields": [
{
"name": "station",
"type": "string"
},
{
"name": "model",
"type": "string"
}
}
序列化消息,例如,
public byte[] serializeMessage(EventMessage eventMessage) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
DatumWriter<EventMessage> writer = new SpecificDatumWriter<EventMessage>(EventMessage.getClassSchema());
writer.write(eventMessage, encoder);
encoder.flush();
out.close();
return out.toByteArray();
}
这是预期的工作。
但是,希望在主题级别设置一个avro模式,以便在消息不符合avro模式时,主题将拒绝消息。
不管怎样,我可以用ApacheKafka0.10做这个。
谢谢
1条答案
按热度按时间e5njpo681#
您可以将confluent的schema registry(其开放源码和apache许可)与ApacheKafka0.10.0结合使用,将模式与主题关联起来。它附带了avro序列化程序/反序列化程序,可以完全按照您请求的方式自动验证avro模式。
请注意,这不是“合流Kafka”这样的东西-这将是一个商标侵犯拥有它。为了方便起见,confluent只是将ApacheKafka打包到其发行版中,但是由于模式注册表位于github上,因此如果您愿意,可以不使用confluent打包而使用它。