kafka avro序列化程序和反序列化程序异常avro支持的类型

4ngedf3f  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(473)

我看到以下错误

exception Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord

我的Kafka制作人道具是

Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BOOTSTRAP_SERVERS);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1000);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
    props.put("schema.registry.url", "http://localhost:8081");
    props.put("value.converter.schema.registry.url", "http://localhost:8081");
    props.put("producer.type", "sync");
    props.put(ProducerConfig.CLIENT_ID_CONFIG, KafkaConstants.CLIENT_ID);

    Producer<String, TweetInfoDto> producer = new KafkaProducer(props);

我的Kafka消费道具是

Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "twitterCrawler");
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, KafkaConstants.CLIENT_ID);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put("schema.registry.url", "http://localhost:8081");
    props.put("value.converter.schema.registry.url", "http://localhost:8081");

    Consumer<String, TweetInfoDto> consumer = new KafkaConsumer(props);

不知道我做错了什么。

bihw5rsg

bihw5rsg1#

TweetInfoDto 不能是您自己定义的普通java对象。
理想情况下,它应该通过avro maven插件从avro模式创建。
请参考schema registry教程了解所有步骤,包括定义avsc和为其生成java类。
此处是教程示例代码

yxyvkwin

yxyvkwin2#

除了cricket\u007提到的内容之外,我们可以考虑使用avro工具——通过代码生成进行序列化和反序列化

相关问题