我编写了一个小java类来测试avro编码的kafka主题的使用。
Properties appProps = new Properties();
appProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://***kfk14bro1.lc:9092");
appProps.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://***kfk14str1.lc:8081");
appProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "consumer");
appProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
appProps.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,LogAndContinueExceptionHandler.class);
StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.stream(
"coordinates", Consumed.with(Serdes.String(), new GenericAvroSerde()))
.peek((key, value) -> System.out.println("key=" + key + ", value=" + value));
new KafkaStreams(streamsBuilder.build(), appProps).start();
当我运行这个类时,serdeconfigues被正确地记录下来,这可以在下面的日志中看到:
[consumer-56b0e0ca-d336-45cc-b388-46a68dbfab8b-StreamThread-1] INFO io.confluent.kafka.serializers.KafkaAvroSerializerConfig - KafkaAvroSerializerConfig values:
schema.registry.url = [http://***kfk14str1.lc:8081]
basic.auth.user.info = [hidden]
auto.register.schemas = true
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
schema.registry.basic.auth.user.info = [hidden]
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
[normal-consumer-56b0e0ca-d336-45cc-b388-46a68dbfab8b-StreamThread-1] INFO io.confluent.kafka.serializers.KafkaAvroDeserializerConfig - KafkaAvroDeserializerConfig values:
schema.registry.url = [http://***kfk14str1.lc:8081]
basic.auth.user.info = [hidden]
auto.register.schemas = true
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
schema.registry.basic.auth.user.info = [hidden]
specific.avro.reader = false
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
但消息未被使用,并为每条消息生成以下日志:
[normal-consumer-56b0e0ca-d336-45cc-b388-46a68dbfab8b-StreamThread-1] WARN org.apache.kafka.streams.errors.LogAndContinueExceptionHandler - Exception caught during Deserialization, taskId: 0_0, topic: coordinates, partition: 0, offset: 782205986
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 83
Caused by: java.lang.NullPointerException
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at io.confluent.kafka.streams.serdes.avro.GenericAvroDeserializer.deserialize(GenericAvroDeserializer.java:63)
at io.confluent.kafka.streams.serdes.avro.GenericAvroDeserializer.deserialize(GenericAvroDeserializer.java:39)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60)
但是我能够从avro控制台消费者那里很好地阅读,所以我知道写入主题的数据没有任何问题。下面的命令打印日志:
~/kafka/confluent-5.1.2/bin/kafka-avro-console-consumer --bootstrap-server http://***kfk14bro1.lc:9092 --topic coordinates --property schema.registry.url=http://***kfk14str1.lc:8081 --property auto.offset.reset=latest
1条答案
按热度按时间wrrgggsh1#
当您自己示例化一个avro serde时,它不会自动使用schema registry url进行配置。
因此,您要么自己配置,要么通过添加以下内容来定义默认serde:
通过移除
要配置serde,请使用以下代码(根据您的情况进行调整):