我有一个spring云应用程序。以下是使用者应用程序属性
cloud:
stream:
default:
producer:
useNativeEncoding: true
consumer:
useNativeEncoding: true
bindings:
inputtest:
destination: test
content-type: application/*+avro
outputtest:
destination: test
content-type: application/*+avro
kafka:
streams:
binder:
configuration:
application:
server: localhost:8082
binder:
producer-properties:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: http://localhost:8081
consumer-properties:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: http://localhost:8081
specific.avro.reader: true
schema:
avro:
dynamicSchemaGenerationEnabled: true
还配置了streamlistener
public void consumeDetails(GenericRecord message) {
System.out.println(message);
}
它能够获得genericord,但是当我将任何java类放在genericord的位置时,它会抛出一个异常
@StreamListener(CreateMessageSink.INPUT)
public void consumeDetails(**TestMessage**message) {
System.out.println(message);
}
唯一的例外是我收到的信息是
org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.dataset.CreateMessage] to [com.notebook..TestMessge] for GenericMessage [payload={ "time": 1570614318582, "task": "Create", "userId": "-1", "status": "Success", "severity": "INFO", "details": {"notebookId": "1", "datasetId": "1"}}, headers={kafka_offset=59, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@745a009e, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=dataset-create-test, kafka_receivedTimestamp=1570614318587, contentType=application/*+avro}], failedMessage=GenericMessage [payload={"tenantId": "-1", "time": 1570614318582, "task": "CreateDataset", "userId": "-1", "status": "Success", "source": "dataset-svc", "severity": "INFO", "details": {"notebookId": "d02fd508-f6cc-4d2f-a713-4298bca4e216", "datasetId": "5d9dac2eb7420146d6a79d30"}}, headers={kafka_offset=59, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@745a009e, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=dataset-create-test, kafka_receivedTimestamp=1570614318587, contentType=application/*+avro}]
at org.springframework.cloud.stream.config.SmartPayloadArgumentResolver.resolveArgument(SmartPayloadArgumentResolver.java:126)
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117)
暂无答案!
目前还没有任何答案,快来回答吧!