Kafkaavrò 找不到主题

dl5txlt9  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(276)

我正在使用在schema registry中注册的avro模式生成一个kafka记录。
主题是注册的,因为当我http://localhost:8081/主题/玩家值/版本/2
我得到:

{
subject: "player-value",
version: 2,
id: 21,
schema: "{"type":"record","name":"player_value","namespace":"com.mycorp.mynamespace","doc":"Sample schema to help you get started.","fields":[{"name":"id","type":"int","doc":"The int type is a 32-bit signed integer."},{"name":"first_name","type":"string","doc":"The string is a unicode character sequence."}]}"
}

我正在下载模式,然后使用genericord生成一个带有该模式的主题。
我已将主题价值策略设置为recordnamingstragegy。
我创建一个通用记录如下:

Schema schema = new Schema.Parser().parse(subject.schema);
                    System.out.println(subject.schema);
                    return new GenericData.Record(schema);
record.put("id", 1);
                    record.put("first_name", "foobar");

其中subject.schema是:

{"type":"record","name":"player_value","namespace":"com.mycorp.mynamespace","doc":"Sample schema to help you get started.","fields":[{"name":"id","type":"int","doc":"The int type is a 32-bit signed integer."},{"name":"first_name","type":"string","doc":"The string is a unicode character sequence."}]}

但是,当我生成时,我得到以下错误:

SerializationException: Error retrieving Avro schema: {"type":"record","name":"player_value","namespace":"com.mycorp.mynamespace","doc":"Sample schema to help you get started.","fields":[{"name":"id","type":"int","doc":"The int type is a 32-bit signed integer."},{"name":"first_name","type":"string","doc":"The string is a unicode character sequence."}]}

这是我的完整代码(您不需要阅读全部代码):

public static void main(String[] args) throws Exception {
        schemaRegistryUtil.downloadSchema("player-value", 2)
                .thenApply(subject -> {
                    Schema schema = new Schema.Parser().parse(subject.schema);
                    System.out.println(subject.schema);
                    return new GenericData.Record(schema);
                })
                .thenApply(record -> {
                    record.put("id", 1);
                    record.put("first_name", "Totti");
                    return record;
                })
                .thenApply(record -> producer.produce("some-key", record, TOPIC))
                .whenCompleteAsync((metadata, throwable) -> {
                    if (throwable != null) {
                        System.out.println(String.format("Error happened %s", throwable.getMessage()));
                    } else {
                        System.out.println("all good man");
                    }
                });

    }

更新

有趣的是,如果我把

properties.setProperty(AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, RecordNameStrategy.class.getName());

效果很好!

balp4ylt

balp4ylt1#

模式注册表中的模式主题有不同的命名策略。此链接提供了策略的描述,以下是该页的摘录。。
任何实施 io.confluent.kafka.serializers.subject.SubjectNameStrategy 可以指定。默认情况下, <topic>-value 用作主题。
默认值为 TopicNameStrategy 包括 topic_name -键或 topic-name -值取决于提供给序列化程序的配置iskey。这允许主题只有一个主题。
另一种命名策略是 RecordNameStrategy 根据avro记录命名。从文件里。。
对于发布到kafka主题的任何avro记录类型 <topicName> ,在注册表中以主题名称注册架构 <topicName>-<recordName> ,在哪里 <recordName> 是完全限定的avro记录名称。
此策略允许主题包含不同记录类型的混合,因为不执行主题内兼容性检查
此外,不同的主题可能包含相同记录名的互不兼容版本,因为兼容性检查的范围是特定主题中的特定记录名。
有时,即使模式注册表不可访问,也会出现异常,尽管您的情况似乎并非如此。

相关问题