avro与kafka

nbnkbykc  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(442)

我从baeldung学习了教程。他们提到创建模式有两种方法。
通过编写json表示并添加maven插件来生成类
通过使用 SchemaBuilder ,他们也提到这是一个更好的选择。
不幸的是,在git示例中,我只看到json的方式。
假设我有一个avro模式:

{
  "type":"record",
  "name":"TestFile",
  "namespace":"com.example.kafka.data.ingestion.model",
  "fields":[
    {
      "name":"date",
      "type":"long"
    },
    {
      "name":"counter",
      "type":"int"
    },
    {
      "name":"mc",
      "type":"string"
    }
  ]
}

通过在我的pom文件中添加此插件:

<plugin>
   <groupId>org.apache.avro</groupId>
   <artifactId>avro-maven-plugin</artifactId>
   <version>1.8.0</version>
   <executions>
      <execution>
         <id>schemas</id>
         <phase>generate-sources</phase>
         <goals>
            <goal>schema</goal>
            <goal>protocol</goal>
            <goal>idl-protocol</goal>
         </goals>
         <configuration>
            <sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory>
            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
         </configuration>
      </execution>
   </executions>
</plugin>

并用生成源进行构建 TestFile.java 是为了我说的目的地。如果要发送到Kafka主题,我可以执行以下操作:

TestFile test = TestFile.newBuilder()
                                            .setDate(102928374747)
                                            .setCounter(2)
                                            .setMc("Some string")
                                            .build();
kafkaTemplate.send(topicName, test);

相当于用 SchemaBuilder 可能是:

Schema testFileSchema = SchemaBuilder   .record("TestFile")
                                            .namespace("com.example.kafka.data.ingestion.model")
                                            .fields()
                                            .requiredLong("date")
                                            .requiredInt("counter")
                                            .requiredString("mc")
                                            .endRecord();

但是现在如何生成pojo并发送 TestFile 数据到我的Kafka主题?

smdncfj3

smdncfj31#

你将无法访问 TestFile 对象,因为架构是在运行时生成的,而不是预编译的。如果你想保留pojo,那么你需要一个 public TestFile(GenericRecord avroRecord) 你需要创建一个 GenericRecord 用那个 Schema 对象,就像从字符串或文件解析它一样。
例如,

Schema schema = SchemaBuilder.record("TestFile")
            .namespace("com.example.kafka.data.ingestion.model")
            .fields()
            .requiredLong("date")
            .requiredInt("counter")
            .requiredString("mc")
            .endRecord();

GenericRecord entry1 = new GenericData.Record(schema);
entry1.put("date", 1L);
entry1.put("counter", 2);
entry1.put("mc", "3");

// producer.send(new ProducerRecord<>(topic, entry1);

一个完整的Kafka的例子可以从合流
如果put不包含必填字段,它将抛出一个错误,并且不会检查类型的值(我可以将 "counter", "2" ,它会发送一个字符串值(这对我来说似乎是个bug)。基本上, GenericRecord == HashMap<String, Object> 增加了Required/nullable字段的好处。
您需要配置一个avro序列化程序,比如confluent,它需要运行它们的模式注册表,或者像cloudera这样的版本
否则,需要将avro对象转换为 byte[] (如链接所示,只需使用 ByteArraySerializer

相关问题