Kafka消费者无法从主题中读取avro记录avro反序列化错误

new9mtju  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(376)

我在kafka consumer中使用springkafka和hortonworks模式注册表。我们在schema registry中定义了一个avro模式。生产者在用avro模式验证记录之后,正在向kafka topic生成avro记录,并将记录发送到topic。我正在使用hortonworks库进行反序列化。

implementation 'com.hortonworks.registries:schema-registry-serdes:0.3.0'

但这些记录并没有在消费端反序列化。正在使用的反序列化程序是 com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer 错误,我得到它没有给出任何提示,关于什么是错误的数据。就像它没有告诉我们不能解析/反序列化任何数据字段一样。低于错误。

Caused by: java.lang.RuntimeException: com.google.common.util.concurrent.UncheckedExecutionException: javax.ws.rs.InternalServerErrorException: HTTP 500 Internal Server Error
        at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.getSchemaVersionInfo(SchemaRegistryClient.java:503)
        at com.hortonworks.registries.schemaregistry.serde.AbstractSnapshotDeserializer.deserialize(AbstractSnapshotDeserializer.java:150)
        at com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:98)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1310)
        at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1541)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1377)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:677)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:632)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1290)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1107)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1063)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:988)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.lang.Thread.run(Thread.java:748)
Caused by: com.google.common.util.concurrent.UncheckedExecutionException: javax.ws.rs.InternalServerErrorException: HTTP 500 Internal Server Error
        at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2049)
        at com.google.common.cache.LocalCache.get(LocalCache.java:3849)
        at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3873)
        at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4798)
        at com.hortonworks.registries.schemaregistry.SchemaVersionInfoCache.getSchema(SchemaVersionInfoCache.java:54)
        at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.getSchemaVersionInfo(SchemaRegistryClient.java:499)
        ... 18 more
Caused by: javax.ws.rs.InternalServerErrorException: HTTP 500 Internal Server Error
        at org.glassfish.jersey.client.JerseyInvocation.convertToException(JerseyInvocation.java:954)
        at org.glassfish.jersey.client.JerseyInvocation.translate(JerseyInvocation.java:739)
        at org.glassfish.jersey.client.JerseyInvocation.lambda$invoke$1(JerseyInvocation.java:623)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:205)
        at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:390)
        at org.glassfish.jersey.client.JerseyInvocation.invoke(JerseyInvocation.java:621)
        at org.glassfish.jersey.client.JerseyInvocation$Builder.method(JerseyInvocation.java:404)
        at org.glassfish.jersey.client.JerseyInvocation$Builder.get(JerseyInvocation.java:300)
        at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient$10.run(SchemaRegistryClient.java:756)
        at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient$10.run(SchemaRegistryClient.java:753)

使用者配置:

Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserialize.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommitFlag);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, conMaxPollRecords); 
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, conMinFetchBytes);
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, conMaxWaitMS);
        props.put("schema.registry.url", schemaRegistryUrl);
        props.put("specific.avro.reader", true);

这里有两点:
从日志来看,反序列化程序似乎无法首先从模式注册表获取模式。但是我们检查了schema是否与用于序列化消息的正确schemaid一起存在。架构注册表url正确。
有可能是生成主题的数据的格式不正确,比如生产者代码中有bug(生产者的验证和序列化问题),但在这种情况下,异常跟踪应该指向代码实际执行反序列化的行号,就像下面方法中的某个地方。下面的代码是com.hortonworks.registries.schemaregistry.serdes.avro.abstractavrosnapshotdeserializer的代码片段。
受保护对象builddeserializedobject(byte protocolid、inputstream payloadinputstream、schemametadata schemametadata、integer writerschemaversion、integer readerschemaversion)引发serdesexception

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题