我不断地将avro格式的数据发送到一个名为“sd\u rtl”的主题。这种数据生成可以通过符合自定义avro模式的汇合datagen实现。
当我在这个主题上使用kafka avro console consumer时,我正确地看到了我的数据。下面是一个正确接收的元组的示例: {"_id":1276215,"serialno":"0","timestamp":416481,"locationid":"Location_0","gpscoords":{"latitude":-2.9789479087622794,"longitude":-4.344459940322691},"data":{"tag1":0,"tag2":1}}
当我试图通过java应用程序使用这些数据时,问题就出现了。我得到错误“未知的魔术字节”。
我使用的代码灵感来自confluent网站序列化程序部分下的第二个片段。
这是我的密码:
//consumer properties
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
//string inputs and outputs
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "localhost:8081");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//subscribe to topic
String topic = "SD_RTL";
final Consumer<String, SensorsPayload> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<String, SensorsPayload> records = consumer.poll(100);
for (ConsumerRecord<String, SensorsPayload> record : records) {
System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
我的代码和confluent的代码有点不同。例如,confluent使用以下行: final Consumer<String, GenericRecord> consumer = new KafkaConsumer<String, String>(props);
如果我把正确的部分 <String, String>
而不是 <String, SensorsPayload>
,intellij抱怨类型不兼容。我不确定是否与我的问题有关。
我已经通过avro maven插件从avro模式自动生成了sensorspayload类。
为什么我的消费者应用程序会产生一个“未知魔法字节”错误,而Kafka的avro控制台消费者却没有?
暂无答案!
目前还没有任何答案,快来回答吧!