我从baeldung学习了教程。他们提到创建模式有两种方法。
通过编写json表示并添加maven插件来生成类
通过使用 SchemaBuilder
,他们也提到这是一个更好的选择。
不幸的是,在git示例中,我只看到json的方式。
假设我有一个avro模式:
{
"type":"record",
"name":"TestFile",
"namespace":"com.example.kafka.data.ingestion.model",
"fields":[
{
"name":"date",
"type":"long"
},
{
"name":"counter",
"type":"int"
},
{
"name":"mc",
"type":"string"
}
]
}
通过在我的pom文件中添加此插件:
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.8.0</version>
<executions>
<execution>
<id>schemas</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
并用生成源进行构建 TestFile.java
是为了我说的目的地。如果要发送到Kafka主题,我可以执行以下操作:
TestFile test = TestFile.newBuilder()
.setDate(102928374747)
.setCounter(2)
.setMc("Some string")
.build();
kafkaTemplate.send(topicName, test);
相当于用 SchemaBuilder
可能是:
Schema testFileSchema = SchemaBuilder .record("TestFile")
.namespace("com.example.kafka.data.ingestion.model")
.fields()
.requiredLong("date")
.requiredInt("counter")
.requiredString("mc")
.endRecord();
但是现在如何生成pojo并发送 TestFile
数据到我的Kafka主题?
1条答案
按热度按时间smdncfj31#
你将无法访问
TestFile
对象,因为架构是在运行时生成的,而不是预编译的。如果你想保留pojo,那么你需要一个public TestFile(GenericRecord avroRecord)
你需要创建一个GenericRecord
用那个Schema
对象,就像从字符串或文件解析它一样。例如,
一个完整的Kafka的例子可以从合流
如果put不包含必填字段,它将抛出一个错误,并且不会检查类型的值(我可以将
"counter", "2"
,它会发送一个字符串值(这对我来说似乎是个bug)。基本上,GenericRecord == HashMap<String, Object>
增加了Required/nullable字段的好处。您需要配置一个avro序列化程序,比如confluent,它需要运行它们的模式注册表,或者像cloudera这样的版本
否则,需要将avro对象转换为
byte[]
(如链接所示,只需使用ByteArraySerializer