我在kafka consumer中使用springkafka和hortonworks模式注册表。我们在schema registry中定义了一个avro模式。生产者在用avro模式验证记录之后,正在向kafka topic生成avro记录,并将记录发送到topic。我正在使用hortonworks库进行反序列化。
implementation 'com.hortonworks.registries:schema-registry-serdes:0.3.0'
但这些记录并没有在消费端反序列化。正在使用的反序列化程序是 com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer
错误,我得到它没有给出任何提示,关于什么是错误的数据。就像它没有告诉我们不能解析/反序列化任何数据字段一样。低于错误。
Caused by: java.lang.RuntimeException: com.google.common.util.concurrent.UncheckedExecutionException: javax.ws.rs.InternalServerErrorException: HTTP 500 Internal Server Error
at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.getSchemaVersionInfo(SchemaRegistryClient.java:503)
at com.hortonworks.registries.schemaregistry.serde.AbstractSnapshotDeserializer.deserialize(AbstractSnapshotDeserializer.java:150)
at com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:98)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1310)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1541)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1377)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:677)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:632)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1290)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1107)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1063)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:988)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.google.common.util.concurrent.UncheckedExecutionException: javax.ws.rs.InternalServerErrorException: HTTP 500 Internal Server Error
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2049)
at com.google.common.cache.LocalCache.get(LocalCache.java:3849)
at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3873)
at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4798)
at com.hortonworks.registries.schemaregistry.SchemaVersionInfoCache.getSchema(SchemaVersionInfoCache.java:54)
at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.getSchemaVersionInfo(SchemaRegistryClient.java:499)
... 18 more
Caused by: javax.ws.rs.InternalServerErrorException: HTTP 500 Internal Server Error
at org.glassfish.jersey.client.JerseyInvocation.convertToException(JerseyInvocation.java:954)
at org.glassfish.jersey.client.JerseyInvocation.translate(JerseyInvocation.java:739)
at org.glassfish.jersey.client.JerseyInvocation.lambda$invoke$1(JerseyInvocation.java:623)
at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
at org.glassfish.jersey.internal.Errors.process(Errors.java:205)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:390)
at org.glassfish.jersey.client.JerseyInvocation.invoke(JerseyInvocation.java:621)
at org.glassfish.jersey.client.JerseyInvocation$Builder.method(JerseyInvocation.java:404)
at org.glassfish.jersey.client.JerseyInvocation$Builder.get(JerseyInvocation.java:300)
at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient$10.run(SchemaRegistryClient.java:756)
at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient$10.run(SchemaRegistryClient.java:753)
使用者配置:
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserialize.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommitFlag);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, conMaxPollRecords);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, conMinFetchBytes);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, conMaxWaitMS);
props.put("schema.registry.url", schemaRegistryUrl);
props.put("specific.avro.reader", true);
这里有两点:
从日志来看,反序列化程序似乎无法首先从模式注册表获取模式。但是我们检查了schema是否与用于序列化消息的正确schemaid一起存在。架构注册表url正确。
有可能是生成主题的数据的格式不正确,比如生产者代码中有bug(生产者的验证和序列化问题),但在这种情况下,异常跟踪应该指向代码实际执行反序列化的行号,就像下面方法中的某个地方。下面的代码是com.hortonworks.registries.schemaregistry.serdes.avro.abstractavrosnapshotdeserializer的代码片段。
受保护对象builddeserializedobject(byte protocolid、inputstream payloadinputstream、schemametadata schemametadata、integer writerschemaversion、integer readerschemaversion)引发serdesexception
暂无答案!
目前还没有任何答案,快来回答吧!