为了更好地理解这个平台,我从几天开始就在玩融合版的Kafka。我收到一些发送到一个主题的格式错误的avro消息的序列化异常。让我用事实来解释这个问题:
<kafka.new.version>0.10.2.0-cp1</kafka.new.version>
<confluent.version>3.2.0</confluent.version>
<avro.version>1.7.7</avro.version>
意图:非常简单,生产者正在发送avro记录,消费者应该毫无问题地使用所有记录(它可能会使所有消息与schema registry中的schema不兼容。)用法:
Producer ->
Key -> StringSerializer
Value -> KafkaAvroSerializer
Consumer ->
Key -> StringDeserializer
Value -> KafkaAvroDeserializer
其他消费属性(仅供参考):
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "somehost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "myconsumer-4");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "someclient-4");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
properties.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
properties.put("schema.registry.url", "schemaregistryhost:8081");
我能够毫无问题地使用消息,直到另一个生产者错误地向这个主题发送了一条消息,并且修改了schema registry中的最新模式(我们在schema registry中启用了一个选项,这样您就可以将任何消息发送到主题,schema registry每次都会生成一个新版本的schema,如果关闭,我们也可以进行切换。)
现在,由于这条错误消息,poll()由于序列化问题而失败。它确实会在失败的地方给我偏移量,我可以使用seek()传递偏移量,但这听起来不太好。我还尝试将max poll records设置为10,并将poll()timeout设置为非常小的值,以便通过捕获异常忽略max 10记录,但由于某些原因,max records无法工作,代码会立即失败并出现序列化错误,即使我从开始处开始,错误消息位于240偏移量。
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
另一个简单的解决方案是在我的应用程序中使用bytearraydeserializer和kafkaavrodecoder,我可以处理反序列化问题。
我相信有些事情我遗漏了或做得不对。也添加了异常:
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition topic.ongo.test3.user14-0 at offset 220
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 186
Caused by: org.apache.avro.AvroTypeException: Found com.catapult.TestUser, expecting com.catapult.TestUser, missing required field testname
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:130)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:176)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:131)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:92)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:54)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:869)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:775)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:473)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1062)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
1条答案
按热度按时间wnavrhmk1#
发现同一问题上已经有一张未结的jira罚单:https://issues.apache.org/jira/browse/kafka-4740