java—如何使用avro序列化程序和模式注册表向kafka发送消息

ny6fqffe  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(610)

我试图用avro序列化程序和模式注册表向kafka发送一个对象。
下面是一个简化的代码:

Properties props = new Properties();
    ...
    props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
    props.put(SCHEMA_REGISTRY_URL_CONFIG, "http://" + schemaRegistryHostname + ":8081");

    Producer<String, User> producer = new KafkaProducer(properties);

    User user = new User("name", "address", 123);
    ProducerRecord record = new ProducerRecord<>(topic, key, user);
    producer.send(record);

我假设模式是从注册表“幕后”读取的,对象(用户)是序列化的,但是我得到下面的错误。
我错过了什么?
我必须显式地读取模式并发送genericord吗?
org.apache.kafka.common.errors.serializationexception:序列化avro消息时出错
原因:java.lang.illegalargumentexception:不支持的avro类型。支持的类型有null、boolean、integer、long、float、double、string、byte[]和indexedrecord
在io.confluent.kafka.serializers.abstractkafkaavroserde.getschema(abstractkafkaavroserde。java:123)~[kafka-avro-serializer-3.3.0.jar!/:?]
位于io.confluent.kafka.serializers.abstractkafkaavroserializer.serializeimpl(abstractkafkaavroserializer)。java:73)~[kafka-avro-serializer-3.3.0.jar!/:?]
在io.confluent.kafka.serializers.kafkaavroserializer.serialize(kafkaavroserializer。java:53)~[kafka-avro-serializer-3.3.0.jar!/:?]
在org.apache.kafka.clients.producer.kafkaproducer.send(kafkaproducer。java:424)~[kafka-clients-0.9.0.1.jar!/:?]

smtd7mpg

smtd7mpg1#

你的代码似乎是正确的,你一定没有用mvn从avsc文件创建正确的结构,用maven生成源代码(在项目文件夹的终端上传递这个命令)
接下来,它将创建一个bean,您可以在其中传递值作为

User order = User.newBuilder()
        .setName("xyz")
        .setAddress("CId432")
        .setPrice("123")
        .build();
snvhrwxg

snvhrwxg2#

你的代码似乎是正确的。唯一可能缺少的是,你的avro对象没有正确地用一些avro插件生成,这意味着你的类需要实现 SpecificRecords 哪个实现了 IndexedRecord .

相关问题