融合Kafkaavro系列和Spring云

pbgvytdp  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(328)

我正在尝试使用springcloud和kafkaavro序列化程序生成一个关于kafka的事件。
在my application.yml中,我有下面的配置,但是当序列化程序尝试生成消息时,它会以字节的形式生成,因为传递给kafkaserializer中getscheme方法的对象是字节数组,而不是genericord。我想我需要在springcloud中使用一个特定的messageconverter,但是我没有找到。

cloud:
stream:
  kafka:
    binder:
      brokers:
        - 'localhost:9092'
      useNativeDecoding: true
    bindings:
      Ptr-output:
          producer:
            configuration:
              schema.registry.url: 'http://localhost:8081'
              key.serializer: org.apache.kafka.common.serialization.StringSerializer
              value.serializer: com.abc.message.ptr.KafkaSerializer
  schemaRegistryClient:
    endpoint: 'http://localhost:8081'
  bindings:
     Ptr-output:
      contentType: application/*+avro
      destination: Ptr
  schema:
    avro:
      schema-locations: 'classpath:avro/Ptr.avsc'
      dynamic-schema-generation-enabled: false

我该怎么办?我该怎么解决?

gj3fmq9x

gj3fmq9x1#

请参阅文档,特别是您需要设置producer属性 useNativeEncoding .
使用的编码
当设置为true时,出站消息由客户机库直接序列化,客户机库必须进行相应配置(例如,设置适当的kafka生产者值序列化程序)。使用此配置时,出站消息封送不基于绑定的contenttype。当使用本机编码时,使用者有责任使用适当的解码器(例如,kafka使用者值反序列化器)来反序列化入站消息。另外,当使用本机编码和解码时,headermode=embeddedheaders属性将被忽略,消息中不会嵌入头。请参阅消费者财产使用记录。

q7solyqu

q7solyqu2#

尝试将序列化程序设置为 io.confluent.kafka.serializers.KafkaAvroDeserializer

相关问题