我的 KafkaProducer
能够使用 KafkaAvroSerializer
将对象序列化到我的主题。然而, KafkaConsumer.poll()
返回反序列化的 GenericRecord
而不是我的序列化类。
mykafkaproducer公司
KafkaProducer<CharSequence, MyBean> producer;
try (InputStream props = Resources.getResource("producer.props").openStream()) {
Properties properties = new Properties();
properties.load(props);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
properties.put("schema.registry.url", "http://localhost:8081");
MyBean bean = new MyBean();
producer = new KafkaProducer<>(properties);
producer.send(new ProducerRecord<>(topic, bean.getId(), bean));
我的Kafka苏美尔
try (InputStream props = Resources.getResource("consumer.props").openStream()) {
properties.load(props);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
properties.put("schema.registry.url", "http://localhost:8081");
consumer = new KafkaConsumer<>(properties);
}
consumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<CharSequence, MyBean> records = consumer.poll(100);
if (records.isEmpty()) {
continue;
}
for (ConsumerRecord<CharSequence, MyBean> record : records) {
MyBean bean = record.value(); // <-------- This is throwing a cast Exception because it cannot cast GenericRecord to MyBean
System.out.println("consumer received: " + bean);
}
}
``` `MyBean bean = record.value();` 该行抛出一个强制转换异常,因为它无法将genericrecord强制转换为mybean。
我在用 `kafka-client-0.9.0.1` , `kafka-avro-serializer-3.0.0` .
2条答案
按热度按时间9bfwbjaz1#
kafkaavroderializer支持特定数据
默认情况下不启用。要启用它:
kafkaavroderializer不支持reflectdata
汇合
KafkaAvroDeserializer
不知道如何使用avro reflectdata反序列化。我必须扩展它以支持avro reflectdata:定义一个自定义反序列化程序类,该类将反序列化为
MyBean
:配置
KafkaConsumer
要使用自定义反序列化程序类,请执行以下操作:xcitsw882#
编辑:反射数据支持被合并(见下文)
为了让chin huang的答案更进一步,为了获得最少的代码和更好的性能,您可能应该这样实现它: