kafka

blmhpbnm  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(237)

我们使用apachekafka(不是合流的kafka)0.10。我们想与Kafka建立avro模式。我有如下avro模式。

{
  "namespace": "Rule",
  "type": "record",
  "name": "RuleMessage",
  "fields": [
    {
      "name": "station",
      "type": "string"
    },
    {
      "name": "model",
      "type": "string"
    }
}

序列化消息,例如,

public byte[] serializeMessage(EventMessage eventMessage) throws IOException {

        ByteArrayOutputStream out = new ByteArrayOutputStream();
        BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
        DatumWriter<EventMessage> writer = new SpecificDatumWriter<EventMessage>(EventMessage.getClassSchema());
        writer.write(eventMessage, encoder);
        encoder.flush();
        out.close();
        return out.toByteArray();
    }

这是预期的工作。
但是,希望在主题级别设置一个avro模式,以便在消息不符合avro模式时,主题将拒绝消息。
不管怎样,我可以用ApacheKafka0.10做这个。
谢谢

e5njpo68

e5njpo681#

您可以将confluent的schema registry(其开放源码和apache许可)与ApacheKafka0.10.0结合使用,将模式与主题关联起来。它附带了avro序列化程序/反序列化程序,可以完全按照您请求的方式自动验证avro模式。
请注意,这不是“合流Kafka”这样的东西-这将是一个商标侵犯拥有它。为了方便起见,confluent只是将ApacheKafka打包到其发行版中,但是由于模式注册表位于github上,因此如果您愿意,可以不使用confluent打包而使用它。

相关问题