我正在尝试使用springcloud和kafkaavro序列化程序生成一个关于kafka的事件。
在my application.yml中,我有下面的配置,但是当序列化程序尝试生成消息时,它会以字节的形式生成,因为传递给kafkaserializer中getscheme方法的对象是字节数组,而不是genericord。我想我需要在springcloud中使用一个特定的messageconverter,但是我没有找到。
cloud:
stream:
kafka:
binder:
brokers:
- 'localhost:9092'
useNativeDecoding: true
bindings:
Ptr-output:
producer:
configuration:
schema.registry.url: 'http://localhost:8081'
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: com.abc.message.ptr.KafkaSerializer
schemaRegistryClient:
endpoint: 'http://localhost:8081'
bindings:
Ptr-output:
contentType: application/*+avro
destination: Ptr
schema:
avro:
schema-locations: 'classpath:avro/Ptr.avsc'
dynamic-schema-generation-enabled: false
我该怎么办?我该怎么解决?
2条答案
按热度按时间gj3fmq9x1#
请参阅文档,特别是您需要设置producer属性
useNativeEncoding
.使用的编码
当设置为true时,出站消息由客户机库直接序列化,客户机库必须进行相应配置(例如,设置适当的kafka生产者值序列化程序)。使用此配置时,出站消息封送不基于绑定的contenttype。当使用本机编码时,使用者有责任使用适当的解码器(例如,kafka使用者值反序列化器)来反序列化入站消息。另外,当使用本机编码和解码时,headermode=embeddedheaders属性将被忽略,消息中不会嵌入头。请参阅消费者财产使用记录。
q7solyqu2#
尝试将序列化程序设置为
io.confluent.kafka.serializers.KafkaAvroDeserializer