为什么我在用我的应用程序反序列化这个avro模式而不是用kafka avro控制台消费者反序列化这个avro模式时会出现“未知魔法字节”错误?

svmlkihl  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(217)

我不断地将avro格式的数据发送到一个名为“sd\u rtl”的主题。这种数据生成可以通过符合自定义avro模式的汇合datagen实现。
当我在这个主题上使用kafka avro console consumer时,我正确地看到了我的数据。下面是一个正确接收的元组的示例: {"_id":1276215,"serialno":"0","timestamp":416481,"locationid":"Location_0","gpscoords":{"latitude":-2.9789479087622794,"longitude":-4.344459940322691},"data":{"tag1":0,"tag2":1}} 当我试图通过java应用程序使用这些数据时,问题就出现了。我得到错误“未知的魔术字节”。
我使用的代码灵感来自confluent网站序列化程序部分下的第二个片段。
这是我的密码:

//consumer properties
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");

        //string inputs and outputs
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        props.put("schema.registry.url", "localhost:8081");

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        //subscribe to topic
        String topic = "SD_RTL";
        final Consumer<String, SensorsPayload> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));

        try {
            while (true) {
                ConsumerRecords<String, SensorsPayload> records = consumer.poll(100);
                for (ConsumerRecord<String, SensorsPayload> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
                }
            }
        } finally {
            consumer.close();
        }

我的代码和confluent的代码有点不同。例如,confluent使用以下行: final Consumer<String, GenericRecord> consumer = new KafkaConsumer<String, String>(props); 如果我把正确的部分 <String, String> 而不是 <String, SensorsPayload> ,intellij抱怨类型不兼容。我不确定是否与我的问题有关。
我已经通过avro maven插件从avro模式自动生成了sensorspayload类。
为什么我的消费者应用程序会产生一个“未知魔法字节”错误,而Kafka的avro控制台消费者却没有?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题