我正在使用在schema registry中注册的avro模式生成一个kafka记录。
主题是注册的,因为当我http://localhost:8081/主题/玩家值/版本/2
我得到:
{
subject: "player-value",
version: 2,
id: 21,
schema: "{"type":"record","name":"player_value","namespace":"com.mycorp.mynamespace","doc":"Sample schema to help you get started.","fields":[{"name":"id","type":"int","doc":"The int type is a 32-bit signed integer."},{"name":"first_name","type":"string","doc":"The string is a unicode character sequence."}]}"
}
我正在下载模式,然后使用genericord生成一个带有该模式的主题。
我已将主题价值策略设置为recordnamingstragegy。
我创建一个通用记录如下:
Schema schema = new Schema.Parser().parse(subject.schema);
System.out.println(subject.schema);
return new GenericData.Record(schema);
record.put("id", 1);
record.put("first_name", "foobar");
其中subject.schema是:
{"type":"record","name":"player_value","namespace":"com.mycorp.mynamespace","doc":"Sample schema to help you get started.","fields":[{"name":"id","type":"int","doc":"The int type is a 32-bit signed integer."},{"name":"first_name","type":"string","doc":"The string is a unicode character sequence."}]}
但是,当我生成时,我得到以下错误:
SerializationException: Error retrieving Avro schema: {"type":"record","name":"player_value","namespace":"com.mycorp.mynamespace","doc":"Sample schema to help you get started.","fields":[{"name":"id","type":"int","doc":"The int type is a 32-bit signed integer."},{"name":"first_name","type":"string","doc":"The string is a unicode character sequence."}]}
这是我的完整代码(您不需要阅读全部代码):
public static void main(String[] args) throws Exception {
schemaRegistryUtil.downloadSchema("player-value", 2)
.thenApply(subject -> {
Schema schema = new Schema.Parser().parse(subject.schema);
System.out.println(subject.schema);
return new GenericData.Record(schema);
})
.thenApply(record -> {
record.put("id", 1);
record.put("first_name", "Totti");
return record;
})
.thenApply(record -> producer.produce("some-key", record, TOPIC))
.whenCompleteAsync((metadata, throwable) -> {
if (throwable != null) {
System.out.println(String.format("Error happened %s", throwable.getMessage()));
} else {
System.out.println("all good man");
}
});
}
更新
有趣的是,如果我把
properties.setProperty(AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, RecordNameStrategy.class.getName());
效果很好!
1条答案
按热度按时间balp4ylt1#
模式注册表中的模式主题有不同的命名策略。此链接提供了策略的描述,以下是该页的摘录。。
任何实施
io.confluent.kafka.serializers.subject.SubjectNameStrategy
可以指定。默认情况下,<topic>-value
用作主题。默认值为
TopicNameStrategy
包括topic_name
-键或topic-name
-值取决于提供给序列化程序的配置iskey。这允许主题只有一个主题。另一种命名策略是
RecordNameStrategy
根据avro记录命名。从文件里。。对于发布到kafka主题的任何avro记录类型
<topicName>
,在注册表中以主题名称注册架构<topicName>-<recordName>
,在哪里<recordName>
是完全限定的avro记录名称。此策略允许主题包含不同记录类型的混合,因为不执行主题内兼容性检查
此外,不同的主题可能包含相同记录名的互不兼容版本,因为兼容性检查的范围是特定主题中的特定记录名。
有时,即使模式注册表不可访问,也会出现异常,尽管您的情况似乎并非如此。