我将Kafka与avro序列化结合使用:http://docs.spring.io/spring-cloud-stream/docs/current-snapshot/reference/htmlsingle/#_avro_schema_registry_client_message_converters
我在本地schema注册表中注册了schema并添加了ed @EnableSchemaRegistryClient
设置为 application.yml
:
spring:
cloud:
stream:
bindings:
output:
destination: output
input:
destination: topicName
contentType: application/*+avro
schemaRegistryClient:
endpoint: http://127.0.0.1:8081
我还使用 avro-tools-1.8.1.jar
. 现在我试着阅读和转换Kafka的信息:
@StreamListener(Sink.INPUT)
public void handlePublish(MyClass message) throws IOException {
logger.info("Receiving MyClass" + message);
}
但转换器出现故障: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative
.
堆栈跟踪:
org.springframework.messaging.MessagingException: Exception thrown while invoking kafka.Consumer#handlePublish[1 args]; nested exception is org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -62
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor$StreamListenerMessageHandler.handleRequestMessage(StreamListenerAnnotationBeanPostProcessor.java:364) ~[spring-cloud-stream-1.1.1.RELEASE.jar:1.1.1.RELEASE]
....
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -62
at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336) ~[avro-1.8.1.jar:1.8.1]
...at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertFromInternal(AbstractAvroMessageConverter.java:91) ~[spring-cloud-stream-schema-1.1.1.RELEASE.jar:1.1.1.RELEASE]
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:175) ~[spring-messaging-4.3.4.RELEASE.jar:4.3.4.RELEASE]
at org.springframework.messaging.converter.CompositeMessageConverter.fromMessage(CompositeMessageConverter.java:67) ~[spring-messaging-4.3.4.RELEASE.jar:4.3.4.RELEASE]
at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:117) ~[spring-messaging-4.3.4.RELEASE.jar:4.3.4.RELEASE]
....
1条答案
按热度按时间h7appiyu1#
我注意到你在上面使用的财产是
spring.cloud.schemaRegistryClient
但这需要spring.cloud.stream.schemaRegistryClient
.